2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
71 eNextState RSSAggregator_Terminate(AsyncIO *IO);
72 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message,
81 if (StrLength(link) > 0)
83 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
84 StrBufAppendBuf(Message, link, 0);
85 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
86 if (StrLength(LinkTitle) > 0)
87 StrBufAppendBuf(Message, LinkTitle, 0);
88 else if ((Title != NULL) && !IsEmptyStr(Title))
89 StrBufAppendBufPlain(Message, Title, -1, 0);
91 StrBufAppendBuf(Message, link, 0);
92 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
97 void DeleteRoomReference(long QRnumber)
103 rss_room_counter *pRoomC;
105 At = GetNewHashPos(RSSQueueRooms, 0);
107 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
109 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
112 pRoomC = (rss_room_counter *) vData;
114 if (pRoomC->count == 0)
115 DeleteEntryFromHash(RSSQueueRooms, At);
121 void UnlinkRooms(rss_aggregator *Cfg)
123 DeleteRoomReference(Cfg->QRnumber);
124 if (Cfg->OtherQRnumbers != NULL)
131 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
132 while (! server_shutting_down &&
133 GetNextHashPos(Cfg->OtherQRnumbers,
139 long *lData = (long*) vData;
140 DeleteRoomReference(*lData);
147 void UnlinkRSSAggregator(rss_aggregator *Cfg)
153 At = GetNewHashPos(RSSFetchUrls, 0);
154 if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
156 DeleteEntryFromHash(RSSFetchUrls, At);
159 last_run = time(NULL);
162 void FreeNetworkSaveMessage (void *vMsg)
164 networker_save_message *Msg = (networker_save_message *) vMsg;
166 CtdlFreeMessageContents(&Msg->Msg);
167 FreeStrBuf(&Msg->Message);
168 FreeStrBuf(&Msg->MsgGUID);
172 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
174 return eAbort; ///TODO
177 eNextState RSSSaveMessage(AsyncIO *IO)
181 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
183 Ctx->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
185 CtdlSubmitMsg(&Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
187 /* write the uidl to the use table so we don't store this item again */
188 cdb_store(CDB_USETABLE,
189 SKEY(Ctx->ThisMsg->MsgGUID),
191 sizeof(struct UseTable) );
193 if (GetNextHashPos(Ctx->Messages,
196 (void**) &Ctx->ThisMsg))
197 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
202 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
206 struct cdbdata *cdbut;
207 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
209 /* Find out if we've already seen this item */
210 strcpy(Ctx->ThisMsg->ut.ut_msgid,
211 ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
212 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
214 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
217 /* Item has already been seen */
219 "%s has already been seen\n",
220 ChrPtr(Ctx->ThisMsg->MsgGUID));
223 /* rewrite the record anyway, to update the timestamp */
224 cdb_store(CDB_USETABLE,
225 SKEY(Ctx->ThisMsg->MsgGUID),
226 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
228 if (GetNextHashPos(Ctx->Messages,
231 (void**) &Ctx->ThisMsg))
232 return NextDBOperation(
234 RSS_FetchNetworkUsetableEntry);
241 NextDBOperation(IO, RSSSaveMessage);
247 * Commit a fetched and parsed RSS item to disk
249 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
251 networker_save_message *SaveMsg;
252 struct MD5Context md5context;
253 u_char rawdigest[MD5_DIGEST_LEN];
257 AsyncIO *IO = &Cfg->IO;
261 SaveMsg = (networker_save_message *) malloc(
262 sizeof(networker_save_message));
263 memset(SaveMsg, 0, sizeof(networker_save_message));
265 /* Construct a GUID to use in the S_USETABLE table.
266 * If one is not present in the item itself, make one up.
268 if (ri->guid != NULL) {
269 StrBufSpaceToBlank(ri->guid);
270 StrBufTrim(ri->guid);
271 guid = NewStrBufPlain(HKEY("rss/"));
272 StrBufAppendBuf(guid, ri->guid, 0);
275 MD5Init(&md5context);
276 if (ri->title != NULL) {
277 MD5Update(&md5context,
278 (const unsigned char*)SKEY(ri->title));
280 if (ri->link != NULL) {
281 MD5Update(&md5context,
282 (const unsigned char*)SKEY(ri->link));
284 MD5Final(rawdigest, &md5context);
285 guid = NewStrBufPlain(NULL,
286 MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
287 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
288 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
291 /* translate Item into message. */
292 EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
293 if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
294 StrBufSpaceToBlank(ri->description);
295 SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC;
296 SaveMsg->Msg.cm_anon_type = MES_NORMAL;
297 SaveMsg->Msg.cm_format_type = FMT_RFC822;
299 if (ri->guid != NULL) {
300 SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid));
303 if (ri->author_or_creator != NULL) {
305 StrBuf *Encoded = NULL;
308 From = html_to_ascii(ChrPtr(ri->author_or_creator),
309 StrLength(ri->author_or_creator),
311 StrBufPlain(ri->author_or_creator, From, -1);
312 StrBufTrim(ri->author_or_creator);
315 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
316 if (!FromAt && StrLength (ri->author_email) > 0)
318 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
319 SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded);
320 SaveMsg->Msg.cm_fields['P'] =
321 SmashStrBuf(&ri->author_email);
327 SaveMsg->Msg.cm_fields['A'] =
328 SmashStrBuf(&ri->author_or_creator);
329 SaveMsg->Msg.cm_fields['P'] =
330 strdup(SaveMsg->Msg.cm_fields['A']);
334 StrBufRFC2047encode(&Encoded,
335 ri->author_or_creator);
336 SaveMsg->Msg.cm_fields['A'] =
337 SmashStrBuf(&Encoded);
338 SaveMsg->Msg.cm_fields['P'] =
339 strdup("rss@localhost");
342 if (ri->pubdate <= 0) {
343 ri->pubdate = time(NULL);
348 SaveMsg->Msg.cm_fields['A'] = strdup("rss");
351 SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME);
352 if (ri->title != NULL) {
355 StrBuf *Encoded, *QPEncoded;
358 StrBufSpaceToBlank(ri->title);
359 len = StrLength(ri->title);
360 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
362 if (Sbj[len - 1] == '\n')
367 Encoded = NewStrBufPlain(Sbj, len);
371 StrBufRFC2047encode(&QPEncoded, Encoded);
373 SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded);
374 FreeStrBuf(&Encoded);
376 SaveMsg->Msg.cm_fields['T'] = malloc(64);
377 snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate);
378 if (ri->channel_title != NULL) {
379 if (StrLength(ri->channel_title) > 0) {
380 SaveMsg->Msg.cm_fields['O'] =
381 strdup(ChrPtr(ri->channel_title));
384 if (ri->link == NULL)
385 ri->link = NewStrBufPlain(HKEY(""));
387 #if 0 /* temporarily disable shorter urls. */
388 SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
389 GetShorterUrls(ri->description);
392 msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
394 Message = NewStrBufPlain(NULL, StrLength(ri->description));
396 StrBufPlain(Message, HKEY(
397 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
399 #if 0 /* disable shorter url for now. */
400 SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
402 StrBufAppendBuf(Message, ri->description, 0);
403 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
405 AppendLink(Message, ri->link, ri->linkTitle, NULL);
406 AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
407 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
409 SaveMsg->MsgGUID = guid;
410 SaveMsg->Message = Message;
412 n = GetCount(Cfg->Messages) + 1;
413 Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
421 int rss_do_fetching(rss_aggregator *Cfg)
428 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
431 ri = (rss_item*) malloc(sizeof(rss_item));
432 memset(ri, 0, sizeof(rss_item));
435 if (! InitcURLIOStruct(&Cfg->IO,
437 "Citadel RSS Client",
438 RSSAggregator_ParseReply,
439 RSSAggregator_Terminate,
440 RSSAggregator_ShutdownAbort))
442 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
446 safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
448 sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
450 syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
451 ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
452 CurlPrepareURL(Cfg->IO.ConnectMe);
454 QueueCurlContext(&Cfg->IO);
459 void DeleteRssCfg(void *vptr)
461 rss_aggregator *rncptr = (rss_aggregator *)vptr;
462 AsyncIO *IO = &rncptr->IO;
463 EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
465 FreeStrBuf(&rncptr->Url);
466 FreeStrBuf(&rncptr->rooms);
467 FreeStrBuf(&rncptr->CData);
468 FreeStrBuf(&rncptr->Key);
469 FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
470 DeleteHash(&rncptr->OtherQRnumbers);
471 FreeURL(&rncptr->IO.ConnectMe);
473 DeleteHashPos (&rncptr->Pos);
474 DeleteHash (&rncptr->Messages);
475 if (rncptr->recp.recp_room != NULL)
476 free(rncptr->recp.recp_room);
479 if (rncptr->Item != NULL)
481 FreeStrBuf(&rncptr->Item->guid);
482 FreeStrBuf(&rncptr->Item->title);
483 FreeStrBuf(&rncptr->Item->link);
484 FreeStrBuf(&rncptr->Item->linkTitle);
485 FreeStrBuf(&rncptr->Item->reLink);
486 FreeStrBuf(&rncptr->Item->reLinkTitle);
487 FreeStrBuf(&rncptr->Item->description);
488 FreeStrBuf(&rncptr->Item->channel_title);
489 FreeStrBuf(&rncptr->Item->author_or_creator);
490 FreeStrBuf(&rncptr->Item->author_url);
491 FreeStrBuf(&rncptr->Item->author_email);
498 eNextState RSSAggregator_Terminate(AsyncIO *IO)
500 rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
502 EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
505 UnlinkRSSAggregator(rncptr);
508 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
511 rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
513 pUrl = IO->ConnectMe->PlainUrl;
517 EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
520 UnlinkRSSAggregator(rncptr);
525 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
527 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
529 StrBuf *CfgData=NULL;
532 rss_room_counter *Count = NULL;
534 char filename[PATH_MAX];
537 rss_aggregator *rncptr = NULL;
538 rss_aggregator *use_this_rncptr = NULL;
540 const char *CfgPtr, *lPtr;
543 pthread_mutex_lock(&RSSQueueMutex);
544 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
547 "rssclient: [%ld] %s already in progress.\n",
550 pthread_mutex_unlock(&RSSQueueMutex);
553 pthread_mutex_unlock(&RSSQueueMutex);
555 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
557 if (server_shutting_down)
560 /* Only do net processing for rooms that have netconfigs */
561 fd = open(filename, 0);
564 "rssclient: %s no config.\n",
569 if (server_shutting_down)
572 if (fstat(fd, &statbuf) == -1) {
574 "ERROR: could not stat configfile '%s' - %s\n",
580 if (server_shutting_down)
583 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
585 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
587 FreeStrBuf(&CfgData);
588 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
589 filename, strerror(errno));
593 if (server_shutting_down)
597 CfgType = NewStrBuf();
598 Line = NewStrBufPlain(NULL, StrLength(CfgData));
602 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
603 if (StrLength(Line) > 0)
606 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
607 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
611 Count = malloc(sizeof(rss_room_counter));
615 rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
616 memset (rncptr, 0, sizeof(rss_aggregator));
617 rncptr->roomlist_parts = 1;
618 rncptr->Url = NewStrBuf();
619 StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
621 pthread_mutex_lock(&RSSQueueMutex);
622 GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
623 use_this_rncptr = (rss_aggregator *)vptr;
624 if (use_this_rncptr != NULL)
627 StrBufAppendBufPlain(use_this_rncptr->rooms,
630 if (use_this_rncptr->roomlist_parts == 1)
632 use_this_rncptr->OtherQRnumbers =
633 NewHash(1, lFlathash);
635 QRnumber = (long*)malloc(sizeof(long));
636 *QRnumber = qrbuf->QRnumber;
637 Put(use_this_rncptr->OtherQRnumbers,
638 LKEY(qrbuf->QRnumber),
641 use_this_rncptr->roomlist_parts++;
643 pthread_mutex_unlock(&RSSQueueMutex);
645 FreeStrBuf(&rncptr->Url);
650 pthread_mutex_unlock(&RSSQueueMutex);
652 rncptr->ItemType = RSS_UNSET;
654 rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
656 pthread_mutex_lock(&RSSQueueMutex);
657 Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
658 pthread_mutex_unlock(&RSSQueueMutex);
664 Count->QRnumber = qrbuf->QRnumber;
665 pthread_mutex_lock(&RSSQueueMutex);
666 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
667 qrbuf->QRnumber, qrbuf->QRname);
668 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
669 pthread_mutex_unlock(&RSSQueueMutex);
671 FreeStrBuf(&CfgData);
672 FreeStrBuf(&CfgType);
677 * Scan for rooms that have RSS client requests configured
679 void rssclient_scan(void) {
680 static int doing_rssclient = 0;
681 rss_aggregator *rptr = NULL;
687 /* Run no more than once every 15 minutes. */
688 if ((time(NULL) - last_run) < 900) {
693 * This is a simple concurrency check to make sure only one rssclient
694 * run is done at a time. We could do this with a mutex, but since we
695 * don't really require extremely fine granularity here, we'll do it
696 * with a static variable instead.
698 if (doing_rssclient) return;
700 if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
703 become_session(&rss_CC);
704 syslog(LOG_DEBUG, "rssclient started\n");
705 CtdlForEachRoom(rssclient_scan_room, NULL);
707 pthread_mutex_lock(&RSSQueueMutex);
709 it = GetNewHashPos(RSSFetchUrls, 0);
710 while (!server_shutting_down &&
711 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
713 rptr = (rss_aggregator *)vrptr;
714 if (!rss_do_fetching(rptr))
715 UnlinkRSSAggregator(rptr);
718 pthread_mutex_unlock(&RSSQueueMutex);
720 syslog(LOG_DEBUG, "rssclient ended\n");
725 void rss_cleanup(void)
727 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
728 DeleteHash(&RSSFetchUrls);
729 DeleteHash(&RSSQueueRooms);
733 CTDL_MODULE_INIT(rssclient)
737 CtdlFillSystemContext(&rss_CC, "rssclient");
738 pthread_mutex_init(&RSSQueueMutex, NULL);
739 RSSQueueRooms = NewHash(1, lFlathash);
740 RSSFetchUrls = NewHash(1, NULL);
741 syslog(LOG_INFO, "%s\n", curl_version());
742 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
743 CtdlRegisterCleanupHook(rss_cleanup);