2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
71 eNextState RSSAggregatorTerminate(AsyncIO *IO);
72 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
78 if (StrLength(link) > 0)
80 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
81 StrBufAppendBuf(Message, link, 0);
82 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
83 if (StrLength(LinkTitle) > 0)
84 StrBufAppendBuf(Message, LinkTitle, 0);
85 else if ((Title != NULL) && !IsEmptyStr(Title))
86 StrBufAppendBufPlain(Message, Title, -1, 0);
88 StrBufAppendBuf(Message, link, 0);
89 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
94 void DeleteRoomReference(long QRnumber)
100 rss_room_counter *pRoomC;
102 At = GetNewHashPos(RSSQueueRooms, 0);
104 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
106 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
109 pRoomC = (rss_room_counter *) vData;
111 if (pRoomC->count == 0)
112 DeleteEntryFromHash(RSSQueueRooms, At);
118 void UnlinkRooms(rss_aggregator *Cfg)
121 DeleteRoomReference(Cfg->QRnumber);
122 if (Cfg->OtherQRnumbers != NULL)
129 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
130 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) &&
133 long *lData = (long*) vData;
134 DeleteRoomReference(*lData);
137 if (server_shutting_down)
144 void UnlinkRSSAggregator(rss_aggregator *Cfg)
150 At = GetNewHashPos(RSSFetchUrls, 0);
151 if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
153 DeleteEntryFromHash(RSSFetchUrls, At);
156 last_run = time(NULL);
159 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
161 networker_save_message *Ctx = (networker_save_message *) IO->Data;
163 pthread_mutex_lock(&RSSQueueMutex);
164 Ctx->Cfg->RefCount --;
166 if (Ctx->Cfg->RefCount == 0)
168 UnlinkRSSAggregator(Ctx->Cfg);
171 pthread_mutex_unlock(&RSSQueueMutex);
173 CtdlFreeMessage(Ctx->Msg);
174 free_recipients(Ctx->recp);
175 FreeStrBuf(&Ctx->Message);
176 FreeStrBuf(&Ctx->MsgGUID);
177 ((struct CitContext*)IO->CitContext)->state = CON_IDLE;
178 ((struct CitContext*)IO->CitContext)->kill_me = 1;
180 last_run = time(NULL);
184 void FreeNetworkSaveMessage (void *vMsg)
186 networker_save_message *Msg = (networker_save_message *) vMsg;
188 CtdlFreeMessage(Msg->Msg);
189 FreeStrBuf(&Msg->Message);
190 FreeStrBuf(&Msg->MsgGUID);
194 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
196 return eAbort; ///TODO
199 eNextState RSSSaveMessage(AsyncIO *IO)
203 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
205 Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
207 CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
209 /* write the uidl to the use table so we don't store this item again */
210 cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
212 if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
213 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
218 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
222 struct cdbdata *cdbut;
223 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
226 /* Find out if we've already seen this item */
227 strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
228 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
230 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
233 /* Item has already been seen */
234 EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
237 /* rewrite the record anyway, to update the timestamp */
238 cdb_store(CDB_USETABLE,
239 SKEY(Ctx->ThisMsg->MsgGUID),
240 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
242 if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
243 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
250 NextDBOperation(IO, RSSSaveMessage);
255 void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg)
257 networker_save_message *Ctx;
259 pthread_mutex_lock(&RSSQueueMutex);
261 pthread_mutex_unlock(&RSSQueueMutex);
264 Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
265 memset(Ctx, 0, sizeof(networker_save_message));
267 Ctx->MsgGUID = MsgGUID;
268 Ctx->Message = MessageBody;
273 Ctx->IO.CitContext = CloneContext(&rss_CC);
274 Ctx->IO.Terminate = FreeNetworkSaveMessage;
275 Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
276 QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
281 * Commit a fetched and parsed RSS item to disk
283 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
286 struct MD5Context md5context;
287 u_char rawdigest[MD5_DIGEST_LEN];
288 struct CtdlMessage *msg;
292 AsyncIO *IO = &Cfg->IO;
296 /* Construct a GUID to use in the S_USETABLE table.
297 * If one is not present in the item itself, make one up.
299 if (ri->guid != NULL) {
300 StrBufSpaceToBlank(ri->guid);
301 StrBufTrim(ri->guid);
302 guid = NewStrBufPlain(HKEY("rss/"));
303 StrBufAppendBuf(guid, ri->guid, 0);
306 MD5Init(&md5context);
307 if (ri->title != NULL) {
308 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
310 if (ri->link != NULL) {
311 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
313 MD5Final(rawdigest, &md5context);
314 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
315 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
316 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
319 /* translate Item into message. */
320 EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
321 if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
322 StrBufSpaceToBlank(ri->description);
323 msg = malloc(sizeof(struct CtdlMessage));
324 memset(msg, 0, sizeof(struct CtdlMessage));
325 msg->cm_magic = CTDLMESSAGE_MAGIC;
326 msg->cm_anon_type = MES_NORMAL;
327 msg->cm_format_type = FMT_RFC822;
329 if (ri->guid != NULL) {
330 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
333 if (ri->author_or_creator != NULL) {
335 StrBuf *Encoded = NULL;
338 From = html_to_ascii(ChrPtr(ri->author_or_creator),
339 StrLength(ri->author_or_creator),
341 StrBufPlain(ri->author_or_creator, From, -1);
342 StrBufTrim(ri->author_or_creator);
345 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
346 if (!FromAt && StrLength (ri->author_email) > 0)
348 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
349 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
350 msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
356 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
357 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
361 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
362 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
363 msg->cm_fields['P'] = strdup("rss@localhost");
366 if (ri->pubdate <= 0) {
367 ri->pubdate = time(NULL);
372 msg->cm_fields['A'] = strdup("rss");
375 msg->cm_fields['N'] = strdup(NODENAME);
376 if (ri->title != NULL) {
379 StrBuf *Encoded, *QPEncoded;
382 StrBufSpaceToBlank(ri->title);
383 len = StrLength(ri->title);
384 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
386 if (Sbj[len - 1] == '\n')
391 Encoded = NewStrBufPlain(Sbj, len);
395 StrBufRFC2047encode(&QPEncoded, Encoded);
397 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
398 FreeStrBuf(&Encoded);
400 msg->cm_fields['T'] = malloc(64);
401 snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
402 if (ri->channel_title != NULL) {
403 if (StrLength(ri->channel_title) > 0) {
404 msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
407 if (ri->link == NULL)
408 ri->link = NewStrBufPlain(HKEY(""));
410 #if 0 /* temporarily disable shorter urls. */
411 msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
414 msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
416 Message = NewStrBufPlain(NULL, StrLength(ri->description));
418 StrBufPlain(Message, HKEY(
419 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
421 #if 0 /* disable shorter url for now. */
422 msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
424 StrBufAppendBuf(Message, ri->description, 0);
425 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
427 AppendLink(Message, ri->link, ri->linkTitle, NULL);
428 AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
429 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
433 networker_save_message *SaveMsg;
435 SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message));
436 memset(SaveMsg, 0, sizeof(networker_save_message));
438 SaveMsg->MsgGUID = guid;
439 SaveMsg->Message = Message;
442 n = GetCount(Cfg->Messages) + 1;
443 Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
451 int rss_do_fetching(rss_aggregator *Cfg)
460 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
463 ri = (rss_item*) malloc(sizeof(rss_item));
464 memset(ri, 0, sizeof(rss_item));
467 IO->CitContext = CloneContext(&rss_CC);
470 safestrncpy(((CitContext*)IO->CitContext)->cs_host,
472 sizeof(((CitContext*)IO->CitContext)->cs_host));
474 syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
475 ParseURL(&IO->ConnectMe, Cfg->Url, 80);
476 CurlPrepareURL(IO->ConnectMe);
478 if (! evcurl_init(IO,
481 "Citadel RSS Client",
483 RSSAggregatorTerminate,
484 RSSAggregatorShutdownAbort))
486 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
490 QueueCurlContext(IO);
495 void DeleteRssCfg(void *vptr)
497 rss_aggregator *rncptr = (rss_aggregator *)vptr;
498 AsyncIO *IO = &rncptr->IO;
499 EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
501 FreeStrBuf(&rncptr->Url);
502 FreeStrBuf(&rncptr->rooms);
503 FreeStrBuf(&rncptr->CData);
504 FreeStrBuf(&rncptr->Key);
505 FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
506 DeleteHash(&rncptr->OtherQRnumbers);
507 FreeURL(&rncptr->IO.ConnectMe);
509 DeleteHashPos (&rncptr->Pos);
510 DeleteHash (&rncptr->Messages);
511 if (rncptr->recp.recp_room != NULL)
512 free(rncptr->recp.recp_room);
515 if (rncptr->Item != NULL)
517 FreeStrBuf(&rncptr->Item->guid);
518 FreeStrBuf(&rncptr->Item->title);
519 FreeStrBuf(&rncptr->Item->link);
520 FreeStrBuf(&rncptr->Item->linkTitle);
521 FreeStrBuf(&rncptr->Item->reLink);
522 FreeStrBuf(&rncptr->Item->reLinkTitle);
523 FreeStrBuf(&rncptr->Item->description);
524 FreeStrBuf(&rncptr->Item->channel_title);
525 FreeStrBuf(&rncptr->Item->author_or_creator);
526 FreeStrBuf(&rncptr->Item->author_url);
527 FreeStrBuf(&rncptr->Item->author_email);
534 eNextState RSSAggregatorTerminate(AsyncIO *IO)
536 rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
538 EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
541 UnlinkRSSAggregator(rncptr);
544 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO)
547 rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
549 pUrl = IO->ConnectMe->PlainUrl;
553 EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
556 UnlinkRSSAggregator(rncptr);
561 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
563 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
565 StrBuf *CfgData=NULL;
568 rss_room_counter *Count = NULL;
570 char filename[PATH_MAX];
573 rss_aggregator *rncptr = NULL;
574 rss_aggregator *use_this_rncptr = NULL;
576 const char *CfgPtr, *lPtr;
579 pthread_mutex_lock(&RSSQueueMutex);
580 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
583 "rssclient: [%ld] %s already in progress.\n",
586 pthread_mutex_unlock(&RSSQueueMutex);
589 pthread_mutex_unlock(&RSSQueueMutex);
591 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
593 if (server_shutting_down)
596 /* Only do net processing for rooms that have netconfigs */
597 fd = open(filename, 0);
599 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
603 if (server_shutting_down)
606 if (fstat(fd, &statbuf) == -1) {
607 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
608 filename, strerror(errno));
612 if (server_shutting_down)
615 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
617 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
619 FreeStrBuf(&CfgData);
620 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
621 filename, strerror(errno));
625 if (server_shutting_down)
629 CfgType = NewStrBuf();
630 Line = NewStrBufPlain(NULL, StrLength(CfgData));
634 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
635 if (StrLength(Line) > 0)
638 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
639 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
643 Count = malloc(sizeof(rss_room_counter));
647 rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
648 memset (rncptr, 0, sizeof(rss_aggregator));
649 rncptr->roomlist_parts = 1;
650 rncptr->Url = NewStrBuf();
651 StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
653 pthread_mutex_lock(&RSSQueueMutex);
654 GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
655 use_this_rncptr = (rss_aggregator *)vptr;
656 if (use_this_rncptr != NULL)
659 StrBufAppendBufPlain(use_this_rncptr->rooms,
662 if (use_this_rncptr->roomlist_parts == 1)
664 use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
666 QRnumber = (long*)malloc(sizeof(long));
667 *QRnumber = qrbuf->QRnumber;
668 Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
669 use_this_rncptr->roomlist_parts++;
671 pthread_mutex_unlock(&RSSQueueMutex);
673 FreeStrBuf(&rncptr->Url);
678 pthread_mutex_unlock(&RSSQueueMutex);
680 rncptr->ItemType = RSS_UNSET;
682 rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
684 pthread_mutex_lock(&RSSQueueMutex);
685 Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
686 pthread_mutex_unlock(&RSSQueueMutex);
692 Count->QRnumber = qrbuf->QRnumber;
693 pthread_mutex_lock(&RSSQueueMutex);
694 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
695 qrbuf->QRnumber, qrbuf->QRname);
696 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
697 pthread_mutex_unlock(&RSSQueueMutex);
699 FreeStrBuf(&CfgData);
700 FreeStrBuf(&CfgType);
705 * Scan for rooms that have RSS client requests configured
707 void rssclient_scan(void) {
708 static int doing_rssclient = 0;
709 rss_aggregator *rptr = NULL;
715 /* Run no more than once every 15 minutes. */
716 if ((time(NULL) - last_run) < 900) {
721 * This is a simple concurrency check to make sure only one rssclient run
722 * is done at a time. We could do this with a mutex, but since we
723 * don't really require extremely fine granularity here, we'll do it
724 * with a static variable instead.
726 if (doing_rssclient) return;
728 if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
731 syslog(LOG_DEBUG, "rssclient started\n");
732 CtdlForEachRoom(rssclient_scan_room, NULL);
734 pthread_mutex_lock(&RSSQueueMutex);
736 it = GetNewHashPos(RSSFetchUrls, 0);
737 while (!server_shutting_down &&
738 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
740 rptr = (rss_aggregator *)vrptr;
741 if (!rss_do_fetching(rptr))
742 UnlinkRSSAggregator(rptr);
745 pthread_mutex_unlock(&RSSQueueMutex);
747 syslog(LOG_DEBUG, "rssclient ended\n");
752 void rss_cleanup(void)
754 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
755 DeleteHash(&RSSFetchUrls);
756 DeleteHash(&RSSQueueRooms);
760 CTDL_MODULE_INIT(rssclient)
764 CtdlFillSystemContext(&rss_CC, "rssclient");
765 pthread_mutex_init(&RSSQueueMutex, NULL);
766 RSSQueueRooms = NewHash(1, lFlathash);
767 RSSFetchUrls = NewHash(1, NULL);
768 syslog(LOG_INFO, "%s\n", curl_version());
769 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
770 CtdlRegisterCleanupHook(rss_cleanup);