2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2012 by the citadel.org team
6 * This program is open source software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12 * GNU General Public License for more details.
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
33 #include <sys/types.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
40 #include "citserver.h"
44 #include "ctdl_module.h"
46 #include "parsedate.h"
48 #include "citadel_dirs.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...) \
77 DBGLOG(LEVEL) syslog(LEVEL, \
78 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
79 IO->ID, CCID, N, __VA_ARGS__)
81 #define EVRSSCM_syslog(LEVEL, FORMAT) \
82 DBGLOG(LEVEL) syslog(LEVEL, \
83 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...) \
87 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT, \
89 #define EVRSSQM_syslog(LEVEL, FORMAT) \
90 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
92 #define EVRSSCSM_syslog(LEVEL, FORMAT) \
93 DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \
96 typedef enum _RSSState {
103 ConstStr RSSStates[] = {
104 {HKEY("Aggregator created")},
105 {HKEY("Fetching content")},
107 {HKEY("parsing content")},
108 {HKEY("checking usetable")}
111 static void SetRSSState(AsyncIO *IO, RSSState State)
113 CitContext* CCC = IO->CitContext;
114 memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
117 void DeleteRoomReference(long QRnumber)
123 rss_room_counter *pRoomC;
125 At = GetNewHashPos(RSSQueueRooms, 0);
127 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
129 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
132 pRoomC = (rss_room_counter *) vData;
134 if (pRoomC->count == 0)
135 DeleteEntryFromHash(RSSQueueRooms, At);
141 void UnlinkRooms(rss_aggregator *RSSAggr)
143 DeleteRoomReference(RSSAggr->QRnumber);
144 if (RSSAggr->OtherQRnumbers != NULL)
151 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
152 while (! server_shutting_down &&
153 GetNextHashPos(RSSAggr->OtherQRnumbers,
159 long *lData = (long*) vData;
160 DeleteRoomReference(*lData);
167 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
171 pthread_mutex_lock(&RSSQueueMutex);
172 UnlinkRooms(RSSAggr);
174 At = GetNewHashPos(RSSFetchUrls, 0);
175 if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
177 DeleteEntryFromHash(RSSFetchUrls, At);
180 last_run = time(NULL);
181 pthread_mutex_unlock(&RSSQueueMutex);
184 void DeleteRssCfg(void *vptr)
186 rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
187 AsyncIO *IO = &RSSAggr->IO;
189 if (IO->CitContext != NULL)
190 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
192 FreeStrBuf(&RSSAggr->Url);
193 FreeStrBuf(&RSSAggr->rooms);
194 FreeStrBuf(&RSSAggr->CData);
195 FreeStrBuf(&RSSAggr->Key);
196 DeleteHash(&RSSAggr->OtherQRnumbers);
198 DeleteHashPos (&RSSAggr->Pos);
199 DeleteHash (&RSSAggr->Messages);
200 if (RSSAggr->recp.recp_room != NULL)
201 free(RSSAggr->recp.recp_room);
204 if (RSSAggr->Item != NULL)
206 flush_rss_item(RSSAggr->Item);
211 FreeAsyncIOContents(&RSSAggr->IO);
212 memset(RSSAggr, 0, sizeof(rss_aggregator));
216 eNextState RSSAggregator_Terminate(AsyncIO *IO)
218 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
220 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
222 StopCurlWatchers(IO);
223 UnlinkRSSAggregator(RSSAggr);
227 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
229 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
231 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
234 StopDBWatchers(&RSSAggr->IO);
235 UnlinkRSSAggregator(RSSAggr);
239 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
242 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
244 pUrl = IO->ConnectMe->PlainUrl;
248 EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
250 StopCurlWatchers(IO);
251 UnlinkRSSAggregator(RSSAggr);
255 void AppendLink(StrBuf *Message,
260 if (StrLength(link) > 0)
262 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
263 StrBufAppendBuf(Message, link, 0);
264 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
265 if (StrLength(LinkTitle) > 0)
266 StrBufAppendBuf(Message, LinkTitle, 0);
267 else if ((Title != NULL) && !IsEmptyStr(Title))
268 StrBufAppendBufPlain(Message, Title, -1, 0);
270 StrBufAppendBuf(Message, link, 0);
271 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
276 void rss_format_item(networker_save_message *SaveMsg)
281 if (SaveMsg->author_or_creator != NULL) {
284 StrBuf *Encoded = NULL;
287 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
288 StrLength(SaveMsg->author_or_creator),
290 StrBufPlain(SaveMsg->author_or_creator, From, -1);
291 StrBufTrim(SaveMsg->author_or_creator);
294 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
295 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
297 StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
298 SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded);
299 SaveMsg->Msg.cm_fields['P'] =
300 SmashStrBuf(&SaveMsg->author_email);
306 SaveMsg->Msg.cm_fields['A'] =
307 SmashStrBuf(&SaveMsg->author_or_creator);
308 SaveMsg->Msg.cm_fields['P'] =
309 strdup(SaveMsg->Msg.cm_fields['A']);
313 StrBufRFC2047encode(&Encoded,
314 SaveMsg->author_or_creator);
315 SaveMsg->Msg.cm_fields['A'] =
316 SmashStrBuf(&Encoded);
317 SaveMsg->Msg.cm_fields['P'] =
318 strdup("rss@localhost");
324 SaveMsg->Msg.cm_fields['A'] = strdup("rss");
327 SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME);
328 if (SaveMsg->title != NULL) {
331 StrBuf *Encoded, *QPEncoded;
334 StrBufSpaceToBlank(SaveMsg->title);
335 len = StrLength(SaveMsg->title);
336 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
338 if ((len > 0) && (Sbj[len - 1] == '\n'))
343 Encoded = NewStrBufPlain(Sbj, len);
347 StrBufRFC2047encode(&QPEncoded, Encoded);
349 SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded);
350 FreeStrBuf(&Encoded);
352 if (SaveMsg->link == NULL)
353 SaveMsg->link = NewStrBufPlain(HKEY(""));
355 #if 0 /* temporarily disable shorter urls. */
356 SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
357 GetShorterUrls(SaveMsg->description);
360 msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
362 Message = NewStrBufPlain(NULL, msglen);
364 StrBufPlain(Message, HKEY(
365 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
367 #if 0 /* disable shorter url for now. */
368 SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
370 StrBufAppendBuf(Message, SaveMsg->description, 0);
371 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
373 AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
374 AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
375 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
378 SaveMsg->Message = Message;
381 eNextState RSSSaveMessage(AsyncIO *IO)
385 rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
387 rss_format_item(RSSAggr->ThisMsg);
389 RSSAggr->ThisMsg->Msg.cm_fields['M'] =
390 SmashStrBuf(&RSSAggr->ThisMsg->Message);
392 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
394 /* write the uidl to the use table so we don't store this item again */
396 CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID);
398 if (GetNextHashPos(RSSAggr->Messages,
401 (void**) &RSSAggr->ThisMsg))
402 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
407 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
411 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
413 /* Find out if we've already seen this item */
416 SetRSSState(IO, eRSSUT);
417 if (CheckIfAlreadySeen("RSS Item Seen",
418 Ctx->ThisMsg->MsgGUID,
420 IO->Now - USETABLE_ANTIEXPIRE,
425 /* Item has already been seen */
426 EVRSSC_syslog(LOG_DEBUG,
427 "%s has already been seen\n",
428 ChrPtr(Ctx->ThisMsg->MsgGUID));
429 SetRSSState(IO, eRSSParsing);
431 if (GetNextHashPos(Ctx->Messages,
434 (void**) &Ctx->ThisMsg))
435 return NextDBOperation(
437 RSS_FetchNetworkUsetableEntry);
444 SetRSSState(IO, eRSSParsing);
446 NextDBOperation(IO, RSSSaveMessage);
452 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
454 u_char rawdigest[MD5_DIGEST_LEN];
455 struct MD5Context md5context;
457 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
459 if (IO->HttpReq.httpcode != 200)
465 SetRSSState(IO, eRSSFailure);
466 ErrMsg = NewStrBuf();
467 EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
468 IO->HttpReq.httpcode);
470 strs[0] = ChrPtr(Ctx->Url);
471 lens[0] = StrLength(Ctx->Url);
473 strs[1] = ChrPtr(Ctx->rooms);
474 lens[1] = StrLength(Ctx->rooms);
476 "Error while RSS-Aggregation Run of %s\n"
477 " need a 200, got a %ld !\n"
478 " Response text was: \n"
481 IO->HttpReq.httpcode,
482 ChrPtr(IO->HttpReq.ReplyData));
485 "RSS Aggregation run failure",
486 2, strs, (long*) &lens,
491 EVRSSC_syslog(LOG_DEBUG,
492 "RSS feed returned an invalid http status code. <%s><HTTP %ld>\n",
494 IO->HttpReq.httpcode);
497 SetRSSState(IO, eRSSUT);
499 MD5Init(&md5context);
501 MD5Update(&md5context,
502 (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
504 MD5Update(&md5context,
505 (const unsigned char*)SKEY(Ctx->Url));
507 MD5Final(rawdigest, &md5context);
508 guid = NewStrBufPlain(NULL,
509 MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
510 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
511 StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
512 if (StrLength(guid) > 40)
513 StrBufCutAt(guid, 40, NULL);
514 /* Find out if we've already seen this item */
518 if (CheckIfAlreadySeen("RSS Whole",
521 IO->Now - USETABLE_ANTIEXPIRE,
528 EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url));
533 SetRSSState(IO, eRSSParsing);
534 return RSSAggregator_ParseReply(IO);
537 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
539 return QueueDBOperation(IO, RSSAggregator_AnalyseReply);
545 int rss_do_fetching(rss_aggregator *RSSAggr)
547 AsyncIO *IO = &RSSAggr->IO;
553 if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
556 ri = (rss_item*) malloc(sizeof(rss_item));
557 memset(ri, 0, sizeof(rss_item));
560 if (! InitcURLIOStruct(&RSSAggr->IO,
562 "Citadel RSS Client",
563 RSSAggregator_FinishHttp,
564 RSSAggregator_Terminate,
565 RSSAggregator_TerminateDB,
566 RSSAggregator_ShutdownAbort))
568 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
571 SetRSSState(IO, eRSSCreated);
573 safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
574 ChrPtr(RSSAggr->Url),
575 sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
577 EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
578 ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
579 CurlPrepareURL(RSSAggr->IO.ConnectMe);
581 SetRSSState(IO, eRSSFetching);
582 QueueCurlContext(&RSSAggr->IO);
587 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
589 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
591 const RoomNetCfgLine *pLine;
592 rss_aggregator *RSSAggr = NULL;
593 rss_aggregator *use_this_RSSAggr = NULL;
596 pthread_mutex_lock(&RSSQueueMutex);
597 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
599 EVRSSQ_syslog(LOG_DEBUG,
600 "rssclient: [%ld] %s already in progress.\n",
603 pthread_mutex_unlock(&RSSQueueMutex);
606 pthread_mutex_unlock(&RSSQueueMutex);
608 if (server_shutting_down) return;
610 pLine = OneRNCFG->NetConfigs[rssclient];
612 while (pLine != NULL)
614 const char *lPtr = NULL;
616 RSSAggr = (rss_aggregator *) malloc(
617 sizeof(rss_aggregator));
619 memset (RSSAggr, 0, sizeof(rss_aggregator));
620 RSSAggr->QRnumber = qrbuf->QRnumber;
621 RSSAggr->roomlist_parts = 1;
622 RSSAggr->Url = NewStrBufPlain(NULL, StrLength(pLine->Value[0]));
623 StrBufExtract_NextToken(RSSAggr->Url,
628 pthread_mutex_lock(&RSSQueueMutex);
629 GetHash(RSSFetchUrls,
633 use_this_RSSAggr = (rss_aggregator *)vptr;
634 if (use_this_RSSAggr != NULL)
637 StrBufAppendBufPlain(
638 use_this_RSSAggr->rooms,
641 if (use_this_RSSAggr->roomlist_parts==1)
643 use_this_RSSAggr->OtherQRnumbers
644 = NewHash(1, lFlathash);
646 QRnumber = (long*)malloc(sizeof(long));
647 *QRnumber = qrbuf->QRnumber;
648 Put(use_this_RSSAggr->OtherQRnumbers,
649 LKEY(qrbuf->QRnumber),
652 use_this_RSSAggr->roomlist_parts++;
654 pthread_mutex_unlock(&RSSQueueMutex);
656 FreeStrBuf(&RSSAggr->Url);
662 pthread_mutex_unlock(&RSSQueueMutex);
664 RSSAggr->ItemType = RSS_UNSET;
666 RSSAggr->rooms = NewStrBufPlain(
669 pthread_mutex_lock(&RSSQueueMutex);
676 pthread_mutex_unlock(&RSSQueueMutex);
682 * Scan for rooms that have RSS client requests configured
684 void rssclient_scan(void) {
685 int RSSRoomCount, RSSCount;
686 rss_aggregator *rptr = NULL;
691 time_t now = time(NULL);
693 /* Run no more than once every 15 minutes. */
694 if ((now - last_run) < 900) {
695 EVRSSQ_syslog(LOG_DEBUG,
696 "Client: polling interval not yet reached; last run was %ldm%lds ago",
697 ((now - last_run) / 60),
698 ((now - last_run) % 60)
704 * This is a simple concurrency check to make sure only one rssclient
705 * run is done at a time.
707 pthread_mutex_lock(&RSSQueueMutex);
708 RSSCount = GetCount(RSSFetchUrls);
709 RSSRoomCount = GetCount(RSSQueueRooms);
710 pthread_mutex_unlock(&RSSQueueMutex);
712 if ((RSSRoomCount > 0) || (RSSCount > 0)) {
713 EVRSSQ_syslog(LOG_DEBUG,
714 "rssclient: concurrency check failed; %d rooms and %d url's are queued",
715 RSSRoomCount, RSSCount
720 become_session(&rss_CC);
721 EVRSSQM_syslog(LOG_DEBUG, "rssclient started");
722 CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
724 if (GetCount(RSSFetchUrls) > 0)
726 pthread_mutex_lock(&RSSQueueMutex);
727 EVRSSQ_syslog(LOG_DEBUG,
728 "rssclient starting %d Clients",
729 GetCount(RSSFetchUrls));
731 it = GetNewHashPos(RSSFetchUrls, 0);
732 while (!server_shutting_down &&
733 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
735 rptr = (rss_aggregator *)vrptr;
736 if (!rss_do_fetching(rptr))
737 UnlinkRSSAggregator(rptr);
740 pthread_mutex_unlock(&RSSQueueMutex);
743 EVRSSQM_syslog(LOG_DEBUG, "Nothing to do.");
745 EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
749 void rss_cleanup(void)
751 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
752 DeleteHash(&RSSFetchUrls);
753 DeleteHash(&RSSQueueRooms);
756 void LogDebugEnableRSSClient(const int n)
758 RSSClientDebugEnabled = n;
761 CTDL_MODULE_INIT(rssclient)
765 CtdlREGISTERRoomCfgType(rssclient, ParseGeneric, 0, 1, SerializeGeneric, DeleteGenericCfgLine); /// todo: implement rss specific parser
766 pthread_mutex_init(&RSSQueueMutex, NULL);
767 RSSQueueRooms = NewHash(1, lFlathash);
768 RSSFetchUrls = NewHash(1, NULL);
769 syslog(LOG_INFO, "%s\n", curl_version());
770 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
771 CtdlRegisterEVCleanupHook(rss_cleanup);
772 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
776 CtdlFillSystemContext(&rss_CC, "rssclient");