/*
* Bring external RSS feeds into rooms.
*
- * Copyright (c) 2007-2010 by the citadel.org team
+ * Copyright (c) 2007-2012 by the citadel.org team
*
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
+ * This program is open source software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
* GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
# include <time.h>
#else
# if HAVE_SYS_TIME_H
-# include <sys/time.h>
+#include <sys/time.h>
# else
-# include <time.h>
+#include <time.h>
# endif
#endif
HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
eNextState RSSAggregator_Terminate(AsyncIO *IO);
+eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
struct CitContext rss_CC;
struct rssnetcfg *rnclist = NULL;
-void AppendLink(StrBuf *Message,
- StrBuf *link,
- StrBuf *LinkTitle,
- const char *Title)
-{
- if (StrLength(link) > 0)
- {
- StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
- StrBufAppendBuf(Message, link, 0);
- StrBufAppendBufPlain(Message, HKEY("\">"), 0);
- if (StrLength(LinkTitle) > 0)
- StrBufAppendBuf(Message, LinkTitle, 0);
- else if ((Title != NULL) && !IsEmptyStr(Title))
- StrBufAppendBufPlain(Message, Title, -1, 0);
- else
- StrBufAppendBuf(Message, link, 0);
- StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
- }
-}
+int RSSClientDebugEnabled = 0;
+#define N ((rss_aggregator*)IO->Data)->QRnumber
+
+#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
+
+#define EVRSSC_syslog(LEVEL, FORMAT, ...) \
+ DBGLOG(LEVEL) syslog(LEVEL, \
+ "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
+ IO->ID, CCID, N, __VA_ARGS__)
+
+#define EVRSSCM_syslog(LEVEL, FORMAT) \
+ DBGLOG(LEVEL) syslog(LEVEL, \
+ "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
+ IO->ID, CCID, N)
+#define EVRSSQ_syslog(LEVEL, FORMAT, ...) \
+ DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT, \
+ __VA_ARGS__)
+#define EVRSSQM_syslog(LEVEL, FORMAT) \
+ DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
+
+#define EVRSSCSM_syslog(LEVEL, FORMAT) \
+ DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \
+ IO->ID, N)
void DeleteRoomReference(long QRnumber)
{
DeleteHashPos(&At);
}
-void UnlinkRooms(rss_aggregator *Cfg)
+void UnlinkRooms(rss_aggregator *RSSAggr)
{
- DeleteRoomReference(Cfg->QRnumber);
- if (Cfg->OtherQRnumbers != NULL)
+ DeleteRoomReference(RSSAggr->QRnumber);
+ if (RSSAggr->OtherQRnumbers != NULL)
{
long HKLen;
const char *HK;
HashPos *At;
void *vData;
- At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
+ At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
while (! server_shutting_down &&
- GetNextHashPos(Cfg->OtherQRnumbers,
+ GetNextHashPos(RSSAggr->OtherQRnumbers,
At,
&HKLen, &HK,
&vData) &&
}
}
-void UnlinkRSSAggregator(rss_aggregator *Cfg)
+void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
{
HashPos *At;
- UnlinkRooms(Cfg);
+ pthread_mutex_lock(&RSSQueueMutex);
+ UnlinkRooms(RSSAggr);
At = GetNewHashPos(RSSFetchUrls, 0);
- if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
+ if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
{
DeleteEntryFromHash(RSSFetchUrls, At);
}
DeleteHashPos(&At);
last_run = time(NULL);
+ pthread_mutex_unlock(&RSSQueueMutex);
}
-void FreeNetworkSaveMessage (void *vMsg)
+void DeleteRssCfg(void *vptr)
{
- networker_save_message *Msg = (networker_save_message *) vMsg;
+ rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
+ AsyncIO *IO = &RSSAggr->IO;
+ EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
+
+ FreeStrBuf(&RSSAggr->Url);
+ FreeStrBuf(&RSSAggr->rooms);
+ FreeStrBuf(&RSSAggr->CData);
+ FreeStrBuf(&RSSAggr->Key);
+ DeleteHash(&RSSAggr->OtherQRnumbers);
+
+ DeleteHashPos (&RSSAggr->Pos);
+ DeleteHash (&RSSAggr->Messages);
+ if (RSSAggr->recp.recp_room != NULL)
+ free(RSSAggr->recp.recp_room);
+
+
+ if (RSSAggr->Item != NULL)
+ {
+ flush_rss_item(RSSAggr->Item);
- CtdlFreeMessageContents(&Msg->Msg);
- FreeStrBuf(&Msg->Message);
- FreeStrBuf(&Msg->MsgGUID);
- free(Msg);
+ free(RSSAggr->Item);
+ }
+
+ FreeAsyncIOContents(&RSSAggr->IO);
+ free(RSSAggr);
+}
+
+eNextState RSSAggregator_Terminate(AsyncIO *IO)
+{
+ rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
+
+ EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
+
+
+ UnlinkRSSAggregator(RSSAggr);
+ return eAbort;
+}
+
+eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
+{
+ rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
+
+ EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
+
+
+ UnlinkRSSAggregator(RSSAggr);
+ return eAbort;
}
+eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
+{
+ const char *pUrl;
+ rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
+
+ pUrl = IO->ConnectMe->PlainUrl;
+ if (pUrl == NULL)
+ pUrl = "";
+
+ EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
+
+
+ UnlinkRSSAggregator(RSSAggr);
+ return eAbort;
+}
+
+
eNextState AbortNetworkSaveMessage (AsyncIO *IO)
{
return eAbort; ///TODO
{
long len;
const char *Key;
- rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
+ rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
- Ctx->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
+ RSSAggr->ThisMsg->Msg.cm_fields['M'] =
+ SmashStrBuf(&RSSAggr->ThisMsg->Message);
- CtdlSubmitMsg(&Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
+ CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
/* write the uidl to the use table so we don't store this item again */
cdb_store(CDB_USETABLE,
- SKEY(Ctx->ThisMsg->MsgGUID),
- &Ctx->ThisMsg->ut,
+ SKEY(RSSAggr->ThisMsg->MsgGUID),
+ &RSSAggr->ThisMsg->ut,
sizeof(struct UseTable) );
- if (GetNextHashPos(Ctx->Messages,
- Ctx->Pos,
+ if (GetNextHashPos(RSSAggr->Messages,
+ RSSAggr->Pos,
&len, &Key,
- (void**) &Ctx->ThisMsg))
+ (void**) &RSSAggr->ThisMsg))
return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
else
return eAbort;
#ifndef DEBUG_RSS
if (cdbut != NULL) {
/* Item has already been seen */
- EV_syslog(LOG_DEBUG,
+ EVRSSC_syslog(LOG_DEBUG,
"%s has already been seen\n",
ChrPtr(Ctx->ThisMsg->MsgGUID));
cdb_free(cdbut);
}
}
-/*
- * Commit a fetched and parsed RSS item to disk
- */
-void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
-{
- networker_save_message *SaveMsg;
- struct MD5Context md5context;
- u_char rawdigest[MD5_DIGEST_LEN];
- int msglen = 0;
- StrBuf *Message;
- StrBuf *guid;
- AsyncIO *IO = &Cfg->IO;
- int n;
-
-
- SaveMsg = (networker_save_message *) malloc(
- sizeof(networker_save_message));
- memset(SaveMsg, 0, sizeof(networker_save_message));
-
- /* Construct a GUID to use in the S_USETABLE table.
- * If one is not present in the item itself, make one up.
- */
- if (ri->guid != NULL) {
- StrBufSpaceToBlank(ri->guid);
- StrBufTrim(ri->guid);
- guid = NewStrBufPlain(HKEY("rss/"));
- StrBufAppendBuf(guid, ri->guid, 0);
- }
- else {
- MD5Init(&md5context);
- if (ri->title != NULL) {
- MD5Update(&md5context,
- (const unsigned char*)SKEY(ri->title));
- }
- if (ri->link != NULL) {
- MD5Update(&md5context,
- (const unsigned char*)SKEY(ri->link));
- }
- MD5Final(rawdigest, &md5context);
- guid = NewStrBufPlain(NULL,
- MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
- StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
- StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
- }
-
- /* translate Item into message. */
- EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
- if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
- StrBufSpaceToBlank(ri->description);
- SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC;
- SaveMsg->Msg.cm_anon_type = MES_NORMAL;
- SaveMsg->Msg.cm_format_type = FMT_RFC822;
-
- if (ri->guid != NULL) {
- SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid));
- }
-
- if (ri->author_or_creator != NULL) {
- char *From;
- StrBuf *Encoded = NULL;
- int FromAt;
-
- From = html_to_ascii(ChrPtr(ri->author_or_creator),
- StrLength(ri->author_or_creator),
- 512, 0);
- StrBufPlain(ri->author_or_creator, From, -1);
- StrBufTrim(ri->author_or_creator);
- free(From);
-
- FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
- if (!FromAt && StrLength (ri->author_email) > 0)
- {
- StrBufRFC2047encode(&Encoded, ri->author_or_creator);
- SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded);
- SaveMsg->Msg.cm_fields['P'] =
- SmashStrBuf(&ri->author_email);
- }
- else
- {
- if (FromAt)
- {
- SaveMsg->Msg.cm_fields['A'] =
- SmashStrBuf(&ri->author_or_creator);
- SaveMsg->Msg.cm_fields['P'] =
- strdup(SaveMsg->Msg.cm_fields['A']);
- }
- else
- {
- StrBufRFC2047encode(&Encoded,
- ri->author_or_creator);
- SaveMsg->Msg.cm_fields['A'] =
- SmashStrBuf(&Encoded);
- SaveMsg->Msg.cm_fields['P'] =
- strdup("rss@localhost");
-
- }
- if (ri->pubdate <= 0) {
- ri->pubdate = time(NULL);
- }
- }
- }
- else {
- SaveMsg->Msg.cm_fields['A'] = strdup("rss");
- }
-
- SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME);
- if (ri->title != NULL) {
- long len;
- char *Sbj;
- StrBuf *Encoded, *QPEncoded;
-
- QPEncoded = NULL;
- StrBufSpaceToBlank(ri->title);
- len = StrLength(ri->title);
- Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
- len = strlen(Sbj);
- if (Sbj[len - 1] == '\n')
- {
- len --;
- Sbj[len] = '\0';
- }
- Encoded = NewStrBufPlain(Sbj, len);
- free(Sbj);
-
- StrBufTrim(Encoded);
- StrBufRFC2047encode(&QPEncoded, Encoded);
-
- SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded);
- FreeStrBuf(&Encoded);
- }
- SaveMsg->Msg.cm_fields['T'] = malloc(64);
- snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate);
- if (ri->channel_title != NULL) {
- if (StrLength(ri->channel_title) > 0) {
- SaveMsg->Msg.cm_fields['O'] =
- strdup(ChrPtr(ri->channel_title));
- }
- }
- if (ri->link == NULL)
- ri->link = NewStrBufPlain(HKEY(""));
-
-#if 0 /* temporarily disable shorter urls. */
- SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
- GetShorterUrls(ri->description);
-#endif
-
- msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
-
- Message = NewStrBufPlain(NULL, StrLength(ri->description));
-
- StrBufPlain(Message, HKEY(
- "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
- "<html><body>\n"));
-#if 0 /* disable shorter url for now. */
- SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
-#endif
- StrBufAppendBuf(Message, ri->description, 0);
- StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
-
- AppendLink(Message, ri->link, ri->linkTitle, NULL);
- AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
- StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
-
- SaveMsg->MsgGUID = guid;
- SaveMsg->Message = Message;
-
- n = GetCount(Cfg->Messages) + 1;
- Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
-}
-
-
-
/*
* Begin a feed parse
*/
-int rss_do_fetching(rss_aggregator *Cfg)
+int rss_do_fetching(rss_aggregator *RSSAggr)
{
+ AsyncIO *IO = &RSSAggr->IO;
rss_item *ri;
time_t now;
now = time(NULL);
- if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
+ if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
return 0;
ri = (rss_item*) malloc(sizeof(rss_item));
memset(ri, 0, sizeof(rss_item));
- Cfg->Item = ri;
+ RSSAggr->Item = ri;
- if (! InitcURLIOStruct(&Cfg->IO,
- Cfg,
+ if (! InitcURLIOStruct(&RSSAggr->IO,
+ RSSAggr,
"Citadel RSS Client",
RSSAggregator_ParseReply,
RSSAggregator_Terminate,
+ RSSAggregator_TerminateDB,
RSSAggregator_ShutdownAbort))
{
- syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
+ EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
return 0;
}
- safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
- ChrPtr(Cfg->Url),
- sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
+ safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
+ ChrPtr(RSSAggr->Url),
+ sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
- syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
- ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
- CurlPrepareURL(Cfg->IO.ConnectMe);
+ EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
+ ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
+ CurlPrepareURL(RSSAggr->IO.ConnectMe);
- QueueCurlContext(&Cfg->IO);
+ QueueCurlContext(&RSSAggr->IO);
return 1;
}
-
-void DeleteRssCfg(void *vptr)
-{
- rss_aggregator *rncptr = (rss_aggregator *)vptr;
- AsyncIO *IO = &rncptr->IO;
- EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
-
- FreeStrBuf(&rncptr->Url);
- FreeStrBuf(&rncptr->rooms);
- FreeStrBuf(&rncptr->CData);
- FreeStrBuf(&rncptr->Key);
- FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
- DeleteHash(&rncptr->OtherQRnumbers);
- FreeURL(&rncptr->IO.ConnectMe);
-
- DeleteHashPos (&rncptr->Pos);
- DeleteHash (&rncptr->Messages);
- if (rncptr->recp.recp_room != NULL)
- free(rncptr->recp.recp_room);
-
-
- if (rncptr->Item != NULL)
- {
- FreeStrBuf(&rncptr->Item->guid);
- FreeStrBuf(&rncptr->Item->title);
- FreeStrBuf(&rncptr->Item->link);
- FreeStrBuf(&rncptr->Item->linkTitle);
- FreeStrBuf(&rncptr->Item->reLink);
- FreeStrBuf(&rncptr->Item->reLinkTitle);
- FreeStrBuf(&rncptr->Item->description);
- FreeStrBuf(&rncptr->Item->channel_title);
- FreeStrBuf(&rncptr->Item->author_or_creator);
- FreeStrBuf(&rncptr->Item->author_url);
- FreeStrBuf(&rncptr->Item->author_email);
-
- free(rncptr->Item);
- }
- free(rncptr);
-}
-
-eNextState RSSAggregator_Terminate(AsyncIO *IO)
-{
- rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
-
- EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
-
-
- UnlinkRSSAggregator(rncptr);
- return eAbort;
-}
-eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
-{
- const char *pUrl;
- rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
-
- pUrl = IO->ConnectMe->PlainUrl;
- if (pUrl == NULL)
- pUrl = "";
-
- EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
-
-
- UnlinkRSSAggregator(rncptr);
- return eAbort;
-}
-
/*
* Scan a room's netconfig to determine whether it is requesting any RSS feeds
*/
rss_room_counter *Count = NULL;
struct stat statbuf;
char filename[PATH_MAX];
- int fd;
+ int fd;
int Done;
- rss_aggregator *rncptr = NULL;
- rss_aggregator *use_this_rncptr = NULL;
+ rss_aggregator *RSSAggr = NULL;
+ rss_aggregator *use_this_RSSAggr = NULL;
void *vptr;
const char *CfgPtr, *lPtr;
const char *Err;
pthread_mutex_lock(&RSSQueueMutex);
if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
{
- syslog(LOG_DEBUG,
- "rssclient: [%ld] %s already in progress.\n",
- qrbuf->QRnumber,
- qrbuf->QRname);
+ EVRSSQ_syslog(LOG_DEBUG,
+ "rssclient: [%ld] %s already in progress.\n",
+ qrbuf->QRnumber,
+ qrbuf->QRname);
pthread_mutex_unlock(&RSSQueueMutex);
return;
}
return;
if (fstat(fd, &statbuf) == -1) {
- syslog(LOG_DEBUG,
- "ERROR: could not stat configfile '%s' - %s\n",
- filename,
- strerror(errno));
+ EVRSSQ_syslog(LOG_DEBUG,
+ "ERROR: could not stat configfile '%s' - %s\n",
+ filename,
+ strerror(errno));
return;
}
if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
close(fd);
FreeStrBuf(&CfgData);
- syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
- filename, strerror(errno));
+ EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
+ filename, strerror(errno));
return;
}
close(fd);
Done = 0;
while (!Done)
{
- Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
- if (StrLength(Line) > 0)
- {
- lPtr = NULL;
- StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
- if (!strcasecmp("rssclient", ChrPtr(CfgType)))
+ Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
+ if (StrLength(Line) > 0)
{
- if (Count == NULL)
- {
- Count = malloc(sizeof(rss_room_counter));
- Count->count = 0;
- }
- Count->count ++;
- rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
- memset (rncptr, 0, sizeof(rss_aggregator));
- rncptr->roomlist_parts = 1;
- rncptr->Url = NewStrBuf();
- StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
-
- pthread_mutex_lock(&RSSQueueMutex);
- GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
- use_this_rncptr = (rss_aggregator *)vptr;
- if (use_this_rncptr != NULL)
- {
- long *QRnumber;
- StrBufAppendBufPlain(use_this_rncptr->rooms,
- qrbuf->QRname,
- -1, 0);
- if (use_this_rncptr->roomlist_parts == 1)
- {
- use_this_rncptr->OtherQRnumbers =
- NewHash(1, lFlathash);
- }
- QRnumber = (long*)malloc(sizeof(long));
- *QRnumber = qrbuf->QRnumber;
- Put(use_this_rncptr->OtherQRnumbers,
- LKEY(qrbuf->QRnumber),
- QRnumber,
- NULL);
- use_this_rncptr->roomlist_parts++;
-
- pthread_mutex_unlock(&RSSQueueMutex);
-
- FreeStrBuf(&rncptr->Url);
- free(rncptr);
- rncptr = NULL;
- continue;
- }
- pthread_mutex_unlock(&RSSQueueMutex);
-
- rncptr->ItemType = RSS_UNSET;
-
- rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
-
- pthread_mutex_lock(&RSSQueueMutex);
- Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
- pthread_mutex_unlock(&RSSQueueMutex);
+ lPtr = NULL;
+ StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
+ if (!strcasecmp("rssclient", ChrPtr(CfgType)))
+ {
+ if (Count == NULL)
+ {
+ Count = malloc(
+ sizeof(rss_room_counter));
+ Count->count = 0;
+ }
+ Count->count ++;
+ RSSAggr = (rss_aggregator *) malloc(
+ sizeof(rss_aggregator));
+
+ memset (RSSAggr, 0, sizeof(rss_aggregator));
+ RSSAggr->QRnumber = qrbuf->QRnumber;
+ RSSAggr->roomlist_parts = 1;
+ RSSAggr->Url = NewStrBuf();
+
+ StrBufExtract_NextToken(RSSAggr->Url,
+ Line,
+ &lPtr,
+ '|');
+
+ pthread_mutex_lock(&RSSQueueMutex);
+ GetHash(RSSFetchUrls,
+ SKEY(RSSAggr->Url),
+ &vptr);
+
+ use_this_RSSAggr = (rss_aggregator *)vptr;
+ if (use_this_RSSAggr != NULL)
+ {
+ long *QRnumber;
+ StrBufAppendBufPlain(
+ use_this_RSSAggr->rooms,
+ qrbuf->QRname,
+ -1, 0);
+ if (use_this_RSSAggr->roomlist_parts==1)
+ {
+ use_this_RSSAggr->OtherQRnumbers
+ = NewHash(1, lFlathash);
+ }
+ QRnumber = (long*)malloc(sizeof(long));
+ *QRnumber = qrbuf->QRnumber;
+ Put(use_this_RSSAggr->OtherQRnumbers,
+ LKEY(qrbuf->QRnumber),
+ QRnumber,
+ NULL);
+ use_this_RSSAggr->roomlist_parts++;
+
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ FreeStrBuf(&RSSAggr->Url);
+ free(RSSAggr);
+ RSSAggr = NULL;
+ continue;
+ }
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ RSSAggr->ItemType = RSS_UNSET;
+
+ RSSAggr->rooms = NewStrBufPlain(
+ qrbuf->QRname, -1);
+
+ pthread_mutex_lock(&RSSQueueMutex);
+
+ Put(RSSFetchUrls,
+ SKEY(RSSAggr->Url),
+ RSSAggr,
+ DeleteRssCfg);
+
+ pthread_mutex_unlock(&RSSQueueMutex);
+ }
}
- }
}
if (Count != NULL)
{
Count->QRnumber = qrbuf->QRnumber;
pthread_mutex_lock(&RSSQueueMutex);
- syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
+ EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
qrbuf->QRnumber, qrbuf->QRname);
Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
pthread_mutex_unlock(&RSSQueueMutex);
* Scan for rooms that have RSS client requests configured
*/
void rssclient_scan(void) {
- static int doing_rssclient = 0;
+ int RSSRoomCount, RSSCount;
rss_aggregator *rptr = NULL;
void *vrptr = NULL;
- HashPos *it;
+ HashPos *it;
long len;
const char *Key;
+ time_t now = time(NULL);
/* Run no more than once every 15 minutes. */
- if ((time(NULL) - last_run) < 900) {
+ if ((now - last_run) < 900) {
+ EVRSSQ_syslog(LOG_DEBUG,
+ "Client: polling interval not yet reached; last run was %ldm%lds ago",
+ ((now - last_run) / 60),
+ ((now - last_run) % 60)
+ );
return;
}
/*
* This is a simple concurrency check to make sure only one rssclient
- * run is done at a time. We could do this with a mutex, but since we
- * don't really require extremely fine granularity here, we'll do it
- * with a static variable instead.
+ * run is done at a time.
*/
- if (doing_rssclient) return;
- doing_rssclient = 1;
- if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
+ pthread_mutex_lock(&RSSQueueMutex);
+ RSSCount = GetCount(RSSFetchUrls);
+ RSSRoomCount = GetCount(RSSQueueRooms);
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ if ((RSSRoomCount > 0) || (RSSCount > 0)) {
+ EVRSSQ_syslog(LOG_DEBUG,
+ "rssclient: concurrency check failed; %d rooms and %d url's are queued",
+ RSSRoomCount, RSSCount
+ );
return;
+ }
become_session(&rss_CC);
- syslog(LOG_DEBUG, "rssclient started\n");
+ EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
CtdlForEachRoom(rssclient_scan_room, NULL);
pthread_mutex_lock(&RSSQueueMutex);
DeleteHashPos(&it);
pthread_mutex_unlock(&RSSQueueMutex);
- syslog(LOG_DEBUG, "rssclient ended\n");
- doing_rssclient = 0;
+ EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
return;
}
DeleteHash(&RSSQueueRooms);
}
+void LogDebugEnableRSSClient(const int n)
+{
+ RSSClientDebugEnabled = n;
+}
CTDL_MODULE_INIT(rssclient)
{
RSSFetchUrls = NewHash(1, NULL);
syslog(LOG_INFO, "%s\n", curl_version());
CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
- CtdlRegisterCleanupHook(rss_cleanup);
+ CtdlRegisterEVCleanupHook(rss_cleanup);
+ CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
}
return "rssclient";
}