2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2012 by the citadel.org team
6 * This program is open source software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12 * GNU General Public License for more details.
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
33 #include <sys/types.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
40 #include "citserver.h"
44 #include "ctdl_module.h"
46 #include "parsedate.h"
48 #include "citadel_dirs.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...) \
77 DBGLOG(LEVEL) syslog(LEVEL, \
78 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
79 IO->ID, CCID, N, __VA_ARGS__)
81 #define EVRSSCM_syslog(LEVEL, FORMAT) \
82 DBGLOG(LEVEL) syslog(LEVEL, \
83 "IO[%ld]CC[%d][%ld]RSS" FORMAT, \
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...) \
87 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT, \
89 #define EVRSSQM_syslog(LEVEL, FORMAT) \
90 DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
92 #define EVRSSCSM_syslog(LEVEL, FORMAT) \
93 DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \
96 void DeleteRoomReference(long QRnumber)
102 rss_room_counter *pRoomC;
104 At = GetNewHashPos(RSSQueueRooms, 0);
106 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
108 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
111 pRoomC = (rss_room_counter *) vData;
113 if (pRoomC->count == 0)
114 DeleteEntryFromHash(RSSQueueRooms, At);
120 void UnlinkRooms(rss_aggregator *RSSAggr)
122 DeleteRoomReference(RSSAggr->QRnumber);
123 if (RSSAggr->OtherQRnumbers != NULL)
130 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
131 while (! server_shutting_down &&
132 GetNextHashPos(RSSAggr->OtherQRnumbers,
138 long *lData = (long*) vData;
139 DeleteRoomReference(*lData);
146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
150 pthread_mutex_lock(&RSSQueueMutex);
151 UnlinkRooms(RSSAggr);
153 At = GetNewHashPos(RSSFetchUrls, 0);
154 if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
156 DeleteEntryFromHash(RSSFetchUrls, At);
159 last_run = time(NULL);
160 pthread_mutex_unlock(&RSSQueueMutex);
163 void DeleteRssCfg(void *vptr)
165 rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
166 AsyncIO *IO = &RSSAggr->IO;
168 if (IO->CitContext != NULL)
169 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
171 FreeStrBuf(&RSSAggr->Url);
172 FreeStrBuf(&RSSAggr->rooms);
173 FreeStrBuf(&RSSAggr->CData);
174 FreeStrBuf(&RSSAggr->Key);
175 DeleteHash(&RSSAggr->OtherQRnumbers);
177 DeleteHashPos (&RSSAggr->Pos);
178 DeleteHash (&RSSAggr->Messages);
179 if (RSSAggr->recp.recp_room != NULL)
180 free(RSSAggr->recp.recp_room);
183 if (RSSAggr->Item != NULL)
185 flush_rss_item(RSSAggr->Item);
190 FreeAsyncIOContents(&RSSAggr->IO);
191 memset(RSSAggr, 0, sizeof(rss_aggregator));
195 eNextState RSSAggregator_Terminate(AsyncIO *IO)
197 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
199 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
201 StopCurlWatchers(IO);
202 UnlinkRSSAggregator(RSSAggr);
206 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
208 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
210 EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
213 StopDBWatchers(&RSSAggr->IO);
214 UnlinkRSSAggregator(RSSAggr);
218 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
221 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
223 pUrl = IO->ConnectMe->PlainUrl;
227 EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
229 StopCurlWatchers(IO);
230 UnlinkRSSAggregator(RSSAggr);
234 void AppendLink(StrBuf *Message,
239 if (StrLength(link) > 0)
241 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
242 StrBufAppendBuf(Message, link, 0);
243 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
244 if (StrLength(LinkTitle) > 0)
245 StrBufAppendBuf(Message, LinkTitle, 0);
246 else if ((Title != NULL) && !IsEmptyStr(Title))
247 StrBufAppendBufPlain(Message, Title, -1, 0);
249 StrBufAppendBuf(Message, link, 0);
250 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
255 void rss_format_item(networker_save_message *SaveMsg)
260 if (SaveMsg->author_or_creator != NULL) {
263 StrBuf *Encoded = NULL;
266 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
267 StrLength(SaveMsg->author_or_creator),
269 StrBufPlain(SaveMsg->author_or_creator, From, -1);
270 StrBufTrim(SaveMsg->author_or_creator);
273 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
274 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
276 StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
277 SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded);
278 SaveMsg->Msg.cm_fields['P'] =
279 SmashStrBuf(&SaveMsg->author_email);
285 SaveMsg->Msg.cm_fields['A'] =
286 SmashStrBuf(&SaveMsg->author_or_creator);
287 SaveMsg->Msg.cm_fields['P'] =
288 strdup(SaveMsg->Msg.cm_fields['A']);
292 StrBufRFC2047encode(&Encoded,
293 SaveMsg->author_or_creator);
294 SaveMsg->Msg.cm_fields['A'] =
295 SmashStrBuf(&Encoded);
296 SaveMsg->Msg.cm_fields['P'] =
297 strdup("rss@localhost");
303 SaveMsg->Msg.cm_fields['A'] = strdup("rss");
306 SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME);
307 if (SaveMsg->title != NULL) {
310 StrBuf *Encoded, *QPEncoded;
313 StrBufSpaceToBlank(SaveMsg->title);
314 len = StrLength(SaveMsg->title);
315 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
317 if ((len > 0) && (Sbj[len - 1] == '\n'))
322 Encoded = NewStrBufPlain(Sbj, len);
326 StrBufRFC2047encode(&QPEncoded, Encoded);
328 SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded);
329 FreeStrBuf(&Encoded);
331 if (SaveMsg->link == NULL)
332 SaveMsg->link = NewStrBufPlain(HKEY(""));
334 #if 0 /* temporarily disable shorter urls. */
335 SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
336 GetShorterUrls(SaveMsg->description);
339 msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
341 Message = NewStrBufPlain(NULL, msglen);
343 StrBufPlain(Message, HKEY(
344 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
346 #if 0 /* disable shorter url for now. */
347 SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
349 StrBufAppendBuf(Message, SaveMsg->description, 0);
350 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
352 AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
353 AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
354 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
357 SaveMsg->Message = Message;
360 eNextState RSSSaveMessage(AsyncIO *IO)
364 rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
366 rss_format_item(RSSAggr->ThisMsg);
368 RSSAggr->ThisMsg->Msg.cm_fields['M'] =
369 SmashStrBuf(&RSSAggr->ThisMsg->Message);
371 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
373 /* write the uidl to the use table so we don't store this item again */
374 cdb_store(CDB_USETABLE,
375 SKEY(RSSAggr->ThisMsg->MsgGUID),
376 &RSSAggr->ThisMsg->ut,
377 sizeof(struct UseTable) );
379 if (GetNextHashPos(RSSAggr->Messages,
382 (void**) &RSSAggr->ThisMsg))
383 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
388 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
392 struct cdbdata *cdbut;
393 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
395 /* Find out if we've already seen this item */
396 strcpy(Ctx->ThisMsg->ut.ut_msgid,
397 ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
398 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
400 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
403 /* Item has already been seen */
404 EVRSSC_syslog(LOG_DEBUG,
405 "%s has already been seen\n",
406 ChrPtr(Ctx->ThisMsg->MsgGUID));
409 /* rewrite the record anyway, to update the timestamp */
410 cdb_store(CDB_USETABLE,
411 SKEY(Ctx->ThisMsg->MsgGUID),
412 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
414 if (GetNextHashPos(Ctx->Messages,
417 (void**) &Ctx->ThisMsg))
418 return NextDBOperation(
420 RSS_FetchNetworkUsetableEntry);
427 NextDBOperation(IO, RSSSaveMessage);
432 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
435 u_char rawdigest[MD5_DIGEST_LEN];
436 struct MD5Context md5context;
438 struct cdbdata *cdbut;
439 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
441 if (IO->HttpReq.httpcode != 200)
447 ErrMsg = NewStrBuf();
448 EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
449 IO->HttpReq.httpcode);
451 strs[0] = ChrPtr(Ctx->Url);
452 lens[0] = StrLength(Ctx->Url);
454 strs[1] = ChrPtr(Ctx->rooms);
455 lens[1] = StrLength(Ctx->rooms);
457 "Error while RSS-Aggregation Run of %s\n"
458 " need a 200, got a %ld !\n"
459 " Response text was: \n"
462 IO->HttpReq.httpcode,
463 ChrPtr(IO->HttpReq.ReplyData));
466 "RSS Aggregation run failure",
467 2, strs, (long*) &lens);
472 MD5Init(&md5context);
474 MD5Update(&md5context,
475 (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
477 MD5Update(&md5context,
478 (const unsigned char*)SKEY(Ctx->Url));
480 MD5Final(rawdigest, &md5context);
481 guid = NewStrBufPlain(NULL,
482 MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
483 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
484 StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
485 if (StrLength(guid) > 40)
486 StrBufCutAt(guid, 40, NULL);
487 /* Find out if we've already seen this item */
490 cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
492 memcpy(&ut, cdbut->ptr,
493 ((cdbut->len > sizeof(struct UseTable)) ?
494 sizeof(struct UseTable) : cdbut->len));
496 if (IO->Now - ut.ut_timestamp >
499 /* Item has already been seen in the last 4 days */
500 EVRSSC_syslog(LOG_DEBUG,
501 "%s has already been seen\n",
507 memcpy(ut.ut_msgid, SKEY(guid));
508 ut.ut_timestamp = IO->Now;
510 /* rewrite the record anyway, to update the timestamp */
511 cdb_store(CDB_USETABLE,
513 &ut, sizeof(struct UseTable) );
515 if (cdbut != NULL) return eAbort;
517 return RSSAggregator_ParseReply(IO);
520 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
522 return QueueDBOperation(IO, RSSAggregator_AnalyseReply);
528 int rss_do_fetching(rss_aggregator *RSSAggr)
530 AsyncIO *IO = &RSSAggr->IO;
536 if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
539 ri = (rss_item*) malloc(sizeof(rss_item));
540 memset(ri, 0, sizeof(rss_item));
543 if (! InitcURLIOStruct(&RSSAggr->IO,
545 "Citadel RSS Client",
546 RSSAggregator_FinishHttp,
547 RSSAggregator_Terminate,
548 RSSAggregator_TerminateDB,
549 RSSAggregator_ShutdownAbort))
551 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
555 safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
556 ChrPtr(RSSAggr->Url),
557 sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
559 EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
560 ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
561 CurlPrepareURL(RSSAggr->IO.ConnectMe);
563 QueueCurlContext(&RSSAggr->IO);
568 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
570 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
572 const RoomNetCfgLine *pLine;
573 rss_room_counter *Count = NULL;
574 rss_aggregator *RSSAggr = NULL;
575 rss_aggregator *use_this_RSSAggr = NULL;
578 pthread_mutex_lock(&RSSQueueMutex);
579 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
581 EVRSSQ_syslog(LOG_DEBUG,
582 "rssclient: [%ld] %s already in progress.\n",
585 pthread_mutex_unlock(&RSSQueueMutex);
588 pthread_mutex_unlock(&RSSQueueMutex);
590 if (server_shutting_down) return;
592 pLine = OneRNCFG->NetConfigs[rssclient];
594 while (pLine != NULL)
596 const char *lPtr = NULL;
601 sizeof(rss_room_counter));
605 RSSAggr = (rss_aggregator *) malloc(
606 sizeof(rss_aggregator));
608 memset (RSSAggr, 0, sizeof(rss_aggregator));
609 RSSAggr->QRnumber = qrbuf->QRnumber;
610 RSSAggr->roomlist_parts = 1;
611 RSSAggr->Url = NewStrBufPlain(NULL, StrLength(pLine->Value[0]));
612 StrBufExtract_NextToken(RSSAggr->Url,
617 pthread_mutex_lock(&RSSQueueMutex);
618 GetHash(RSSFetchUrls,
622 use_this_RSSAggr = (rss_aggregator *)vptr;
623 if (use_this_RSSAggr != NULL)
626 StrBufAppendBufPlain(
627 use_this_RSSAggr->rooms,
630 if (use_this_RSSAggr->roomlist_parts==1)
632 use_this_RSSAggr->OtherQRnumbers
633 = NewHash(1, lFlathash);
635 QRnumber = (long*)malloc(sizeof(long));
636 *QRnumber = qrbuf->QRnumber;
637 Put(use_this_RSSAggr->OtherQRnumbers,
638 LKEY(qrbuf->QRnumber),
641 use_this_RSSAggr->roomlist_parts++;
643 pthread_mutex_unlock(&RSSQueueMutex);
645 FreeStrBuf(&RSSAggr->Url);
651 pthread_mutex_unlock(&RSSQueueMutex);
653 RSSAggr->ItemType = RSS_UNSET;
655 RSSAggr->rooms = NewStrBufPlain(
658 pthread_mutex_lock(&RSSQueueMutex);
665 pthread_mutex_unlock(&RSSQueueMutex);
671 * Scan for rooms that have RSS client requests configured
673 void rssclient_scan(void) {
674 int RSSRoomCount, RSSCount;
675 rss_aggregator *rptr = NULL;
680 time_t now = time(NULL);
682 /* Run no more than once every 15 minutes. */
683 if ((now - last_run) < 900) {
684 EVRSSQ_syslog(LOG_DEBUG,
685 "Client: polling interval not yet reached; last run was %ldm%lds ago",
686 ((now - last_run) / 60),
687 ((now - last_run) % 60)
693 * This is a simple concurrency check to make sure only one rssclient
694 * run is done at a time.
696 pthread_mutex_lock(&RSSQueueMutex);
697 RSSCount = GetCount(RSSFetchUrls);
698 RSSRoomCount = GetCount(RSSQueueRooms);
699 pthread_mutex_unlock(&RSSQueueMutex);
701 if ((RSSRoomCount > 0) || (RSSCount > 0)) {
702 EVRSSQ_syslog(LOG_DEBUG,
703 "rssclient: concurrency check failed; %d rooms and %d url's are queued",
704 RSSRoomCount, RSSCount
709 become_session(&rss_CC);
710 EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
711 CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
713 pthread_mutex_lock(&RSSQueueMutex);
715 it = GetNewHashPos(RSSFetchUrls, 0);
716 while (!server_shutting_down &&
717 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
719 rptr = (rss_aggregator *)vrptr;
720 if (!rss_do_fetching(rptr))
721 UnlinkRSSAggregator(rptr);
724 pthread_mutex_unlock(&RSSQueueMutex);
726 EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
730 void rss_cleanup(void)
732 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
733 DeleteHash(&RSSFetchUrls);
734 DeleteHash(&RSSQueueRooms);
737 void LogDebugEnableRSSClient(const int n)
739 RSSClientDebugEnabled = n;
742 CTDL_MODULE_INIT(rssclient)
746 CtdlREGISTERRoomCfgType(rssclient, ParseGeneric, 0, 1, SerializeGeneric, DeleteGenericCfgLine); /// todo: implement rss specific parser
747 pthread_mutex_init(&RSSQueueMutex, NULL);
748 RSSQueueRooms = NewHash(1, lFlathash);
749 RSSFetchUrls = NewHash(1, NULL);
750 syslog(LOG_INFO, "%s\n", curl_version());
751 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
752 CtdlRegisterEVCleanupHook(rss_cleanup);
753 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
757 CtdlFillSystemContext(&rss_CC, "rssclient");