2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2012 by the citadel.org team
6 * This program is open source software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12 * GNU General Public License for more details.
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
33 #include <sys/types.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
40 #include "citserver.h"
44 #include "ctdl_module.h"
46 #include "parsedate.h"
48 #include "citadel_dirs.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
70 struct rssnetcfg *rnclist = NULL;
73 void DeleteRoomReference(long QRnumber)
79 rss_room_counter *pRoomC;
81 At = GetNewHashPos(RSSQueueRooms, 0);
83 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
85 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
88 pRoomC = (rss_room_counter *) vData;
90 if (pRoomC->count == 0)
91 DeleteEntryFromHash(RSSQueueRooms, At);
97 void UnlinkRooms(rss_aggregator *Cfg)
99 DeleteRoomReference(Cfg->QRnumber);
100 if (Cfg->OtherQRnumbers != NULL)
107 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
108 while (! server_shutting_down &&
109 GetNextHashPos(Cfg->OtherQRnumbers,
115 long *lData = (long*) vData;
116 DeleteRoomReference(*lData);
123 void UnlinkRSSAggregator(rss_aggregator *Cfg)
127 pthread_mutex_lock(&RSSQueueMutex);
130 At = GetNewHashPos(RSSFetchUrls, 0);
131 if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
133 DeleteEntryFromHash(RSSFetchUrls, At);
136 last_run = time(NULL);
137 pthread_mutex_unlock(&RSSQueueMutex);
140 void DeleteRssCfg(void *vptr)
142 rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
143 AsyncIO *IO = &RSSAggr->IO;
144 EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
146 FreeStrBuf(&RSSAggr->Url);
147 FreeStrBuf(&RSSAggr->rooms);
148 FreeStrBuf(&RSSAggr->CData);
149 FreeStrBuf(&RSSAggr->Key);
150 DeleteHash(&RSSAggr->OtherQRnumbers);
152 DeleteHashPos (&RSSAggr->Pos);
153 DeleteHash (&RSSAggr->Messages);
154 if (RSSAggr->recp.recp_room != NULL)
155 free(RSSAggr->recp.recp_room);
158 if (RSSAggr->Item != NULL)
160 flush_rss_item(RSSAggr->Item);
165 FreeAsyncIOContents(&RSSAggr->IO);
169 eNextState RSSAggregator_Terminate(AsyncIO *IO)
171 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
173 EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
176 UnlinkRSSAggregator(RSSAggr);
180 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
182 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
184 EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
187 UnlinkRSSAggregator(RSSAggr);
191 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
194 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
196 pUrl = IO->ConnectMe->PlainUrl;
200 EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
203 UnlinkRSSAggregator(RSSAggr);
208 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
210 return eAbort; ///TODO
213 eNextState RSSSaveMessage(AsyncIO *IO)
217 rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
219 RSSAggr->ThisMsg->Msg.cm_fields['M'] =
220 SmashStrBuf(&RSSAggr->ThisMsg->Message);
222 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
224 /* write the uidl to the use table so we don't store this item again */
225 cdb_store(CDB_USETABLE,
226 SKEY(RSSAggr->ThisMsg->MsgGUID),
227 &RSSAggr->ThisMsg->ut,
228 sizeof(struct UseTable) );
230 if (GetNextHashPos(RSSAggr->Messages,
233 (void**) &RSSAggr->ThisMsg))
234 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
239 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
243 struct cdbdata *cdbut;
244 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
246 /* Find out if we've already seen this item */
247 strcpy(Ctx->ThisMsg->ut.ut_msgid,
248 ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
249 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
251 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
254 /* Item has already been seen */
256 "%s has already been seen\n",
257 ChrPtr(Ctx->ThisMsg->MsgGUID));
260 /* rewrite the record anyway, to update the timestamp */
261 cdb_store(CDB_USETABLE,
262 SKEY(Ctx->ThisMsg->MsgGUID),
263 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
265 if (GetNextHashPos(Ctx->Messages,
268 (void**) &Ctx->ThisMsg))
269 return NextDBOperation(
271 RSS_FetchNetworkUsetableEntry);
278 NextDBOperation(IO, RSSSaveMessage);
286 int rss_do_fetching(rss_aggregator *Cfg)
293 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
296 ri = (rss_item*) malloc(sizeof(rss_item));
297 memset(ri, 0, sizeof(rss_item));
300 if (! InitcURLIOStruct(&Cfg->IO,
302 "Citadel RSS Client",
303 RSSAggregator_ParseReply,
304 RSSAggregator_Terminate,
305 RSSAggregator_TerminateDB,
306 RSSAggregator_ShutdownAbort))
308 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
312 safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
314 sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
316 syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
317 ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
318 CurlPrepareURL(Cfg->IO.ConnectMe);
320 QueueCurlContext(&Cfg->IO);
325 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
327 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
329 StrBuf *CfgData=NULL;
332 rss_room_counter *Count = NULL;
334 char filename[PATH_MAX];
337 rss_aggregator *RSSAggr = NULL;
338 rss_aggregator *use_this_RSSAggr = NULL;
340 const char *CfgPtr, *lPtr;
343 pthread_mutex_lock(&RSSQueueMutex);
344 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
347 "rssclient: [%ld] %s already in progress.\n",
350 pthread_mutex_unlock(&RSSQueueMutex);
353 pthread_mutex_unlock(&RSSQueueMutex);
355 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
357 if (server_shutting_down)
360 /* Only do net processing for rooms that have netconfigs */
361 fd = open(filename, 0);
364 "rssclient: %s no config.\n",
369 if (server_shutting_down)
372 if (fstat(fd, &statbuf) == -1) {
374 "ERROR: could not stat configfile '%s' - %s\n",
380 if (server_shutting_down)
383 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
385 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
387 FreeStrBuf(&CfgData);
388 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
389 filename, strerror(errno));
393 if (server_shutting_down)
397 CfgType = NewStrBuf();
398 Line = NewStrBufPlain(NULL, StrLength(CfgData));
402 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
403 if (StrLength(Line) > 0)
406 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
407 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
412 sizeof(rss_room_counter));
416 RSSAggr = (rss_aggregator *) malloc(
417 sizeof(rss_aggregator));
419 memset (RSSAggr, 0, sizeof(rss_aggregator));
420 RSSAggr->QRnumber = qrbuf->QRnumber;
421 RSSAggr->roomlist_parts = 1;
422 RSSAggr->Url = NewStrBuf();
424 StrBufExtract_NextToken(RSSAggr->Url,
429 pthread_mutex_lock(&RSSQueueMutex);
430 GetHash(RSSFetchUrls,
434 use_this_RSSAggr = (rss_aggregator *)vptr;
435 if (use_this_RSSAggr != NULL)
438 StrBufAppendBufPlain(
439 use_this_RSSAggr->rooms,
442 if (use_this_RSSAggr->roomlist_parts==1)
444 use_this_RSSAggr->OtherQRnumbers
445 = NewHash(1, lFlathash);
447 QRnumber = (long*)malloc(sizeof(long));
448 *QRnumber = qrbuf->QRnumber;
449 Put(use_this_RSSAggr->OtherQRnumbers,
450 LKEY(qrbuf->QRnumber),
453 use_this_RSSAggr->roomlist_parts++;
455 pthread_mutex_unlock(&RSSQueueMutex);
457 FreeStrBuf(&RSSAggr->Url);
462 pthread_mutex_unlock(&RSSQueueMutex);
464 RSSAggr->ItemType = RSS_UNSET;
466 RSSAggr->rooms = NewStrBufPlain(
469 pthread_mutex_lock(&RSSQueueMutex);
476 pthread_mutex_unlock(&RSSQueueMutex);
482 Count->QRnumber = qrbuf->QRnumber;
483 pthread_mutex_lock(&RSSQueueMutex);
484 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
485 qrbuf->QRnumber, qrbuf->QRname);
486 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
487 pthread_mutex_unlock(&RSSQueueMutex);
489 FreeStrBuf(&CfgData);
490 FreeStrBuf(&CfgType);
495 * Scan for rooms that have RSS client requests configured
497 void rssclient_scan(void) {
498 int RSSRoomCount, RSSCount;
499 rss_aggregator *rptr = NULL;
504 time_t now = time(NULL);
506 /* Run no more than once every 15 minutes. */
507 if ((now - last_run) < 900) {
509 "rssclient: polling interval not yet reached; last run was %ldm%lds ago",
510 ((now - last_run) / 60),
511 ((now - last_run) % 60)
517 * This is a simple concurrency check to make sure only one rssclient
518 * run is done at a time.
520 pthread_mutex_lock(&RSSQueueMutex);
521 RSSCount = GetCount(RSSFetchUrls);
522 RSSRoomCount = GetCount(RSSQueueRooms);
523 pthread_mutex_unlock(&RSSQueueMutex);
525 if ((RSSRoomCount > 0) || (RSSCount > 0)) {
527 "rssclient: concurrency check failed; %d rooms and %d url's are queued",
528 RSSRoomCount, RSSCount
533 become_session(&rss_CC);
534 syslog(LOG_DEBUG, "rssclient started\n");
535 CtdlForEachRoom(rssclient_scan_room, NULL);
537 pthread_mutex_lock(&RSSQueueMutex);
539 it = GetNewHashPos(RSSFetchUrls, 0);
540 while (!server_shutting_down &&
541 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
543 rptr = (rss_aggregator *)vrptr;
544 if (!rss_do_fetching(rptr))
545 UnlinkRSSAggregator(rptr);
548 pthread_mutex_unlock(&RSSQueueMutex);
550 syslog(LOG_DEBUG, "rssclient ended\n");
554 void rss_cleanup(void)
556 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
557 DeleteHash(&RSSFetchUrls);
558 DeleteHash(&RSSQueueRooms);
562 CTDL_MODULE_INIT(rssclient)
566 CtdlFillSystemContext(&rss_CC, "rssclient");
567 pthread_mutex_init(&RSSQueueMutex, NULL);
568 RSSQueueRooms = NewHash(1, lFlathash);
569 RSSFetchUrls = NewHash(1, NULL);
570 syslog(LOG_INFO, "%s\n", curl_version());
571 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
572 CtdlRegisterEVCleanupHook(rss_cleanup);