2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2012 by the citadel.org team
6 * This program is open source software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12 * GNU General Public License for more details.
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
33 #include <sys/types.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
40 #include "citserver.h"
44 #include "ctdl_module.h"
46 #include "parsedate.h"
48 #include "citadel_dirs.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...) \
77 DBGLOG(LEVEL) syslog(LEVEL, \
78 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
79 IO->ID, CCID, N, __VA_ARGS__)
81 #define EVRSSCM_syslog(LEVEL, FORMAT) \
82 DBGLOG(LEVEL) syslog(LEVEL, \
83 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...) \
87 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT, \
89 #define EVRSSQM_syslog(LEVEL, FORMAT) \
90 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
92 #define EVRSSCSM_syslog(LEVEL, FORMAT) \
93 DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \
96 void DeleteRoomReference(long QRnumber)
102 rss_room_counter *pRoomC;
104 At = GetNewHashPos(RSSQueueRooms, 0);
106 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
108 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
111 pRoomC = (rss_room_counter *) vData;
113 if (pRoomC->count == 0)
114 DeleteEntryFromHash(RSSQueueRooms, At);
120 void UnlinkRooms(rss_aggregator *RSSAggr)
122 DeleteRoomReference(RSSAggr->QRnumber);
123 if (RSSAggr->OtherQRnumbers != NULL)
130 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
131 while (! server_shutting_down &&
132 GetNextHashPos(RSSAggr->OtherQRnumbers,
138 long *lData = (long*) vData;
139 DeleteRoomReference(*lData);
146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
150 pthread_mutex_lock(&RSSQueueMutex);
151 UnlinkRooms(RSSAggr);
153 At = GetNewHashPos(RSSFetchUrls, 0);
154 if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
156 DeleteEntryFromHash(RSSFetchUrls, At);
159 last_run = time(NULL);
160 pthread_mutex_unlock(&RSSQueueMutex);
163 void DeleteRssCfg(void *vptr)
165 rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
166 AsyncIO *IO = &RSSAggr->IO;
167 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
169 FreeStrBuf(&RSSAggr->Url);
170 FreeStrBuf(&RSSAggr->rooms);
171 FreeStrBuf(&RSSAggr->CData);
172 FreeStrBuf(&RSSAggr->Key);
173 DeleteHash(&RSSAggr->OtherQRnumbers);
175 DeleteHashPos (&RSSAggr->Pos);
176 DeleteHash (&RSSAggr->Messages);
177 if (RSSAggr->recp.recp_room != NULL)
178 free(RSSAggr->recp.recp_room);
181 if (RSSAggr->Item != NULL)
183 flush_rss_item(RSSAggr->Item);
188 FreeAsyncIOContents(&RSSAggr->IO);
192 eNextState RSSAggregator_Terminate(AsyncIO *IO)
194 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
196 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
199 UnlinkRSSAggregator(RSSAggr);
203 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
205 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
207 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
210 UnlinkRSSAggregator(RSSAggr);
214 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
217 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
219 pUrl = IO->ConnectMe->PlainUrl;
223 EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
226 UnlinkRSSAggregator(RSSAggr);
231 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
233 return eAbort; ///TODO
236 eNextState RSSSaveMessage(AsyncIO *IO)
240 rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
242 RSSAggr->ThisMsg->Msg.cm_fields['M'] =
243 SmashStrBuf(&RSSAggr->ThisMsg->Message);
245 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
247 /* write the uidl to the use table so we don't store this item again */
248 cdb_store(CDB_USETABLE,
249 SKEY(RSSAggr->ThisMsg->MsgGUID),
250 &RSSAggr->ThisMsg->ut,
251 sizeof(struct UseTable) );
253 if (GetNextHashPos(RSSAggr->Messages,
256 (void**) &RSSAggr->ThisMsg))
257 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
262 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
266 struct cdbdata *cdbut;
267 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
269 /* Find out if we've already seen this item */
270 strcpy(Ctx->ThisMsg->ut.ut_msgid,
271 ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
272 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
274 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
277 /* Item has already been seen */
278 EVRSSC_syslog(LOG_DEBUG,
279 "%s has already been seen\n",
280 ChrPtr(Ctx->ThisMsg->MsgGUID));
283 /* rewrite the record anyway, to update the timestamp */
284 cdb_store(CDB_USETABLE,
285 SKEY(Ctx->ThisMsg->MsgGUID),
286 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
288 if (GetNextHashPos(Ctx->Messages,
291 (void**) &Ctx->ThisMsg))
292 return NextDBOperation(
294 RSS_FetchNetworkUsetableEntry);
301 NextDBOperation(IO, RSSSaveMessage);
309 int rss_do_fetching(rss_aggregator *RSSAggr)
311 AsyncIO *IO = &RSSAggr->IO;
317 if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
320 ri = (rss_item*) malloc(sizeof(rss_item));
321 memset(ri, 0, sizeof(rss_item));
324 if (! InitcURLIOStruct(&RSSAggr->IO,
326 "Citadel RSS Client",
327 RSSAggregator_ParseReply,
328 RSSAggregator_Terminate,
329 RSSAggregator_TerminateDB,
330 RSSAggregator_ShutdownAbort))
332 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
336 safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
337 ChrPtr(RSSAggr->Url),
338 sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
340 EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
341 ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
342 CurlPrepareURL(RSSAggr->IO.ConnectMe);
344 QueueCurlContext(&RSSAggr->IO);
349 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
351 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
353 StrBuf *CfgData=NULL;
356 rss_room_counter *Count = NULL;
358 char filename[PATH_MAX];
361 rss_aggregator *RSSAggr = NULL;
362 rss_aggregator *use_this_RSSAggr = NULL;
364 const char *CfgPtr, *lPtr;
367 pthread_mutex_lock(&RSSQueueMutex);
368 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
370 EVRSSQ_syslog(LOG_DEBUG,
371 "rssclient: [%ld] %s already in progress.\n",
374 pthread_mutex_unlock(&RSSQueueMutex);
377 pthread_mutex_unlock(&RSSQueueMutex);
379 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
381 if (server_shutting_down)
384 /* Only do net processing for rooms that have netconfigs */
385 fd = open(filename, 0);
388 "rssclient: %s no config.\n",
393 if (server_shutting_down)
396 if (fstat(fd, &statbuf) == -1) {
397 EVRSSQ_syslog(LOG_DEBUG,
398 "ERROR: could not stat configfile '%s' - %s\n",
404 if (server_shutting_down)
407 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
409 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
411 FreeStrBuf(&CfgData);
412 EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
413 filename, strerror(errno));
417 if (server_shutting_down)
421 CfgType = NewStrBuf();
422 Line = NewStrBufPlain(NULL, StrLength(CfgData));
426 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
427 if (StrLength(Line) > 0)
430 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
431 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
436 sizeof(rss_room_counter));
440 RSSAggr = (rss_aggregator *) malloc(
441 sizeof(rss_aggregator));
443 memset (RSSAggr, 0, sizeof(rss_aggregator));
444 RSSAggr->QRnumber = qrbuf->QRnumber;
445 RSSAggr->roomlist_parts = 1;
446 RSSAggr->Url = NewStrBuf();
448 StrBufExtract_NextToken(RSSAggr->Url,
453 pthread_mutex_lock(&RSSQueueMutex);
454 GetHash(RSSFetchUrls,
458 use_this_RSSAggr = (rss_aggregator *)vptr;
459 if (use_this_RSSAggr != NULL)
462 StrBufAppendBufPlain(
463 use_this_RSSAggr->rooms,
466 if (use_this_RSSAggr->roomlist_parts==1)
468 use_this_RSSAggr->OtherQRnumbers
469 = NewHash(1, lFlathash);
471 QRnumber = (long*)malloc(sizeof(long));
472 *QRnumber = qrbuf->QRnumber;
473 Put(use_this_RSSAggr->OtherQRnumbers,
474 LKEY(qrbuf->QRnumber),
477 use_this_RSSAggr->roomlist_parts++;
479 pthread_mutex_unlock(&RSSQueueMutex);
481 FreeStrBuf(&RSSAggr->Url);
486 pthread_mutex_unlock(&RSSQueueMutex);
488 RSSAggr->ItemType = RSS_UNSET;
490 RSSAggr->rooms = NewStrBufPlain(
493 pthread_mutex_lock(&RSSQueueMutex);
500 pthread_mutex_unlock(&RSSQueueMutex);
506 Count->QRnumber = qrbuf->QRnumber;
507 pthread_mutex_lock(&RSSQueueMutex);
508 EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
509 qrbuf->QRnumber, qrbuf->QRname);
510 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
511 pthread_mutex_unlock(&RSSQueueMutex);
513 FreeStrBuf(&CfgData);
514 FreeStrBuf(&CfgType);
519 * Scan for rooms that have RSS client requests configured
521 void rssclient_scan(void) {
522 int RSSRoomCount, RSSCount;
523 rss_aggregator *rptr = NULL;
528 time_t now = time(NULL);
530 /* Run no more than once every 15 minutes. */
531 if ((now - last_run) < 900) {
532 EVRSSQ_syslog(LOG_DEBUG,
533 "Client: polling interval not yet reached; last run was %ldm%lds ago",
534 ((now - last_run) / 60),
535 ((now - last_run) % 60)
541 * This is a simple concurrency check to make sure only one rssclient
542 * run is done at a time.
544 pthread_mutex_lock(&RSSQueueMutex);
545 RSSCount = GetCount(RSSFetchUrls);
546 RSSRoomCount = GetCount(RSSQueueRooms);
547 pthread_mutex_unlock(&RSSQueueMutex);
549 if ((RSSRoomCount > 0) || (RSSCount > 0)) {
550 EVRSSQ_syslog(LOG_DEBUG,
551 "rssclient: concurrency check failed; %d rooms and %d url's are queued",
552 RSSRoomCount, RSSCount
557 become_session(&rss_CC);
558 EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
559 CtdlForEachRoom(rssclient_scan_room, NULL);
561 pthread_mutex_lock(&RSSQueueMutex);
563 it = GetNewHashPos(RSSFetchUrls, 0);
564 while (!server_shutting_down &&
565 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
567 rptr = (rss_aggregator *)vrptr;
568 if (!rss_do_fetching(rptr))
569 UnlinkRSSAggregator(rptr);
572 pthread_mutex_unlock(&RSSQueueMutex);
574 EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
578 void rss_cleanup(void)
580 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
581 DeleteHash(&RSSFetchUrls);
582 DeleteHash(&RSSQueueRooms);
585 void LogDebugEnableRSSClient(const int n)
587 RSSClientDebugEnabled = n;
590 CTDL_MODULE_INIT(rssclient)
594 CtdlFillSystemContext(&rss_CC, "rssclient");
595 pthread_mutex_init(&RSSQueueMutex, NULL);
596 RSSQueueRooms = NewHash(1, lFlathash);
597 RSSFetchUrls = NewHash(1, NULL);
598 syslog(LOG_INFO, "%s\n", curl_version());
599 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
600 CtdlRegisterEVCleanupHook(rss_cleanup);
601 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);