/*
* Bring external RSS feeds into rooms.
*
- * Copyright (c) 2007-2010 by the citadel.org team
+ * Copyright (c) 2007-2012 by the citadel.org team
*
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
+ * This program is open source software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
* GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <stdlib.h>
# include <time.h>
#else
# if HAVE_SYS_TIME_H
-# include <sys/time.h>
+#include <sys/time.h>
# else
-# include <time.h>
+#include <time.h>
# endif
#endif
HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
eNextState RSSAggregator_Terminate(AsyncIO *IO);
+eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
struct CitContext rss_CC;
{
HashPos *At;
+ pthread_mutex_lock(&RSSQueueMutex);
UnlinkRooms(Cfg);
At = GetNewHashPos(RSSFetchUrls, 0);
}
DeleteHashPos(&At);
last_run = time(NULL);
+ pthread_mutex_unlock(&RSSQueueMutex);
}
-
void DeleteRssCfg(void *vptr)
{
rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
UnlinkRSSAggregator(RSSAggr);
return eAbort;
}
+
+eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
+{
+ rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
+
+ EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
+
+
+ UnlinkRSSAggregator(RSSAggr);
+ return eAbort;
+}
+
eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
{
const char *pUrl;
const char *Key;
rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
- RSSAggr->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&RSSAggr->ThisMsg->Message);
+ RSSAggr->ThisMsg->Msg.cm_fields['M'] =
+ SmashStrBuf(&RSSAggr->ThisMsg->Message);
CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
}
}
-
-
/*
* Begin a feed parse
*/
"Citadel RSS Client",
RSSAggregator_ParseReply,
RSSAggregator_Terminate,
+ RSSAggregator_TerminateDB,
RSSAggregator_ShutdownAbort))
{
syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
rss_room_counter *Count = NULL;
struct stat statbuf;
char filename[PATH_MAX];
- int fd;
+ int fd;
int Done;
rss_aggregator *RSSAggr = NULL;
rss_aggregator *use_this_RSSAggr = NULL;
close(fd);
FreeStrBuf(&CfgData);
syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
- filename, strerror(errno));
+ filename, strerror(errno));
return;
}
close(fd);
Done = 0;
while (!Done)
{
- Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
- if (StrLength(Line) > 0)
- {
- lPtr = NULL;
- StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
- if (!strcasecmp("rssclient", ChrPtr(CfgType)))
+ Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
+ if (StrLength(Line) > 0)
{
- if (Count == NULL)
- {
- Count = malloc(sizeof(rss_room_counter));
- Count->count = 0;
- }
- Count->count ++;
- RSSAggr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
- memset (RSSAggr, 0, sizeof(rss_aggregator));
- RSSAggr->roomlist_parts = 1;
- RSSAggr->Url = NewStrBuf();
- StrBufExtract_NextToken(RSSAggr->Url, Line, &lPtr, '|');
-
- pthread_mutex_lock(&RSSQueueMutex);
- GetHash(RSSFetchUrls, SKEY(RSSAggr->Url), &vptr);
- use_this_RSSAggr = (rss_aggregator *)vptr;
- if (use_this_RSSAggr != NULL)
- {
- long *QRnumber;
- StrBufAppendBufPlain(use_this_RSSAggr->rooms,
- qrbuf->QRname,
- -1, 0);
- if (use_this_RSSAggr->roomlist_parts == 1)
- {
- use_this_RSSAggr->OtherQRnumbers =
- NewHash(1, lFlathash);
- }
- QRnumber = (long*)malloc(sizeof(long));
- *QRnumber = qrbuf->QRnumber;
- Put(use_this_RSSAggr->OtherQRnumbers,
- LKEY(qrbuf->QRnumber),
- QRnumber,
- NULL);
- use_this_RSSAggr->roomlist_parts++;
-
- pthread_mutex_unlock(&RSSQueueMutex);
-
- FreeStrBuf(&RSSAggr->Url);
- free(RSSAggr);
- RSSAggr = NULL;
- continue;
- }
- pthread_mutex_unlock(&RSSQueueMutex);
-
- RSSAggr->ItemType = RSS_UNSET;
-
- RSSAggr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
-
- pthread_mutex_lock(&RSSQueueMutex);
- Put(RSSFetchUrls, SKEY(RSSAggr->Url), RSSAggr, DeleteRssCfg);
- pthread_mutex_unlock(&RSSQueueMutex);
+ lPtr = NULL;
+ StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
+ if (!strcasecmp("rssclient", ChrPtr(CfgType)))
+ {
+ if (Count == NULL)
+ {
+ Count = malloc(
+ sizeof(rss_room_counter));
+ Count->count = 0;
+ }
+ Count->count ++;
+ RSSAggr = (rss_aggregator *) malloc(
+ sizeof(rss_aggregator));
+
+ memset (RSSAggr, 0, sizeof(rss_aggregator));
+ RSSAggr->QRnumber = qrbuf->QRnumber;
+ RSSAggr->roomlist_parts = 1;
+ RSSAggr->Url = NewStrBuf();
+
+ StrBufExtract_NextToken(RSSAggr->Url,
+ Line,
+ &lPtr,
+ '|');
+
+ pthread_mutex_lock(&RSSQueueMutex);
+ GetHash(RSSFetchUrls,
+ SKEY(RSSAggr->Url),
+ &vptr);
+
+ use_this_RSSAggr = (rss_aggregator *)vptr;
+ if (use_this_RSSAggr != NULL)
+ {
+ long *QRnumber;
+ StrBufAppendBufPlain(
+ use_this_RSSAggr->rooms,
+ qrbuf->QRname,
+ -1, 0);
+ if (use_this_RSSAggr->roomlist_parts==1)
+ {
+ use_this_RSSAggr->OtherQRnumbers
+ = NewHash(1, lFlathash);
+ }
+ QRnumber = (long*)malloc(sizeof(long));
+ *QRnumber = qrbuf->QRnumber;
+ Put(use_this_RSSAggr->OtherQRnumbers,
+ LKEY(qrbuf->QRnumber),
+ QRnumber,
+ NULL);
+ use_this_RSSAggr->roomlist_parts++;
+
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ FreeStrBuf(&RSSAggr->Url);
+ free(RSSAggr);
+ RSSAggr = NULL;
+ continue;
+ }
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ RSSAggr->ItemType = RSS_UNSET;
+
+ RSSAggr->rooms = NewStrBufPlain(
+ qrbuf->QRname, -1);
+
+ pthread_mutex_lock(&RSSQueueMutex);
+
+ Put(RSSFetchUrls,
+ SKEY(RSSAggr->Url),
+ RSSAggr,
+ DeleteRssCfg);
+
+ pthread_mutex_unlock(&RSSQueueMutex);
+ }
}
- }
}
if (Count != NULL)
{
Count->QRnumber = qrbuf->QRnumber;
pthread_mutex_lock(&RSSQueueMutex);
- syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
- qrbuf->QRnumber, qrbuf->QRname);
+ syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
+ qrbuf->QRnumber, qrbuf->QRname);
Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
pthread_mutex_unlock(&RSSQueueMutex);
}
* Scan for rooms that have RSS client requests configured
*/
void rssclient_scan(void) {
- static int doing_rssclient = 0;
+ int RSSRoomCount, RSSCount;
rss_aggregator *rptr = NULL;
void *vrptr = NULL;
- HashPos *it;
+ HashPos *it;
long len;
const char *Key;
+ time_t now = time(NULL);
/* Run no more than once every 15 minutes. */
- if ((time(NULL) - last_run) < 900) {
+ if ((now - last_run) < 900) {
+ syslog(LOG_DEBUG,
+ "rssclient: polling interval not yet reached; last run was %ldm%lds ago",
+ ((now - last_run) / 60),
+ ((now - last_run) % 60)
+ );
return;
}
/*
* This is a simple concurrency check to make sure only one rssclient
- * run is done at a time. We could do this with a mutex, but since we
- * don't really require extremely fine granularity here, we'll do it
- * with a static variable instead.
+ * run is done at a time.
*/
- if (doing_rssclient) return;
- doing_rssclient = 1;
- if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
+ pthread_mutex_lock(&RSSQueueMutex);
+ RSSCount = GetCount(RSSFetchUrls);
+ RSSRoomCount = GetCount(RSSQueueRooms);
+ pthread_mutex_unlock(&RSSQueueMutex);
+
+ if ((RSSRoomCount > 0) || (RSSCount > 0)) {
+ syslog(LOG_DEBUG,
+ "rssclient: concurrency check failed; %d rooms and %d url's are queued",
+ RSSRoomCount, RSSCount
+ );
return;
+ }
become_session(&rss_CC);
syslog(LOG_DEBUG, "rssclient started\n");
pthread_mutex_unlock(&RSSQueueMutex);
syslog(LOG_DEBUG, "rssclient ended\n");
- doing_rssclient = 0;
return;
}
RSSFetchUrls = NewHash(1, NULL);
syslog(LOG_INFO, "%s\n", curl_version());
CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
- CtdlRegisterCleanupHook(rss_cleanup);
+ CtdlRegisterEVCleanupHook(rss_cleanup);
}
return "rssclient";
}