2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
65 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
66 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
67 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
69 eNextState RSSAggregatorTerminate(AsyncIO *IO);
71 struct CitContext rss_CC;
73 struct rssnetcfg *rnclist = NULL;
74 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
76 if (StrLength(link) > 0)
78 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
79 StrBufAppendBuf(Message, link, 0);
80 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
81 if (StrLength(LinkTitle) > 0)
82 StrBufAppendBuf(Message, LinkTitle, 0);
83 else if ((Title != NULL) && !IsEmptyStr(Title))
84 StrBufAppendBufPlain(Message, Title, -1, 0);
86 StrBufAppendBuf(Message, link, 0);
87 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
90 typedef struct __networker_save_message {
92 struct CtdlMessage *Msg;
93 struct recptypes *recp;
98 } networker_save_message;
101 void DeleteRoomReference(long QRnumber)
107 rss_room_counter *pRoomC;
109 At = GetNewHashPos(RSSQueueRooms, 0);
111 GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At);
112 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
115 pRoomC = (rss_room_counter *) vData;
117 if (pRoomC->count == 0)
118 DeleteEntryFromHash(RSSQueueRooms, At);
123 void UnlinkRooms(rss_aggregator *Cfg)
126 DeleteRoomReference(Cfg->QRnumber);
127 if (Cfg->OtherQRnumbers != NULL)
134 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
135 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) &&
138 long *lData = (long*) vData;
139 DeleteRoomReference(*lData);
142 if (server_shutting_down)
149 void UnlinkRSSAggregator(rss_aggregator *Cfg)
155 At = GetNewHashPos(RSSFetchUrls, 0);
156 if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At) == 0)
158 DeleteEntryFromHash(RSSFetchUrls, At);
163 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
165 networker_save_message *Ctx = (networker_save_message *) IO->Data;
167 pthread_mutex_lock(&RSSQueueMutex);
168 Ctx->Cfg->RefCount --;
170 if (Ctx->Cfg->RefCount == 0)
172 UnlinkRSSAggregator(Ctx->Cfg);
175 pthread_mutex_unlock(&RSSQueueMutex);
177 CtdlFreeMessage(Ctx->Msg);
178 free_recipients(Ctx->recp);
179 FreeStrBuf(&Ctx->Message);
180 FreeStrBuf(&Ctx->MsgGUID);
185 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
187 return eAbort; ///TODO
190 eNextState RSSSaveMessage(AsyncIO *IO)
192 networker_save_message *Ctx = (networker_save_message *) IO->Data;
194 Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
196 CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
198 /* write the uidl to the use table so we don't store this item again */
199 cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
201 return eTerminateConnection;
204 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
206 struct cdbdata *cdbut;
207 networker_save_message *Ctx = (networker_save_message *) IO->Data;
209 /* Find out if we've already seen this item */
210 strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
211 Ctx->ut.ut_timestamp = time(NULL);
213 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
216 /* Item has already been seen */
217 syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
220 /* rewrite the record anyway, to update the timestamp */
221 cdb_store(CDB_USETABLE,
223 &Ctx->ut, sizeof(struct UseTable) );
229 NextDBOperation(IO, RSSSaveMessage);
233 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg)
235 networker_save_message *Ctx;
237 Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
238 memset(Ctx, 0, sizeof(networker_save_message));
240 Ctx->MsgGUID = MsgGUID;
241 Ctx->Message = MessageBody;
246 Ctx->IO.CitContext = CloneContext(&rss_CC);
247 Ctx->IO.Terminate = FreeNetworkSaveMessage;
248 Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
249 QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
254 * Commit a fetched and parsed RSS item to disk
256 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
259 struct MD5Context md5context;
260 u_char rawdigest[MD5_DIGEST_LEN];
261 struct CtdlMessage *msg;
262 struct recptypes *recp = NULL;
268 recp = (struct recptypes *) malloc(sizeof(struct recptypes));
269 if (recp == NULL) return;
270 memset(recp, 0, sizeof(struct recptypes));
271 Buf = NewStrBufDup(Cfg->rooms);
272 recp->recp_room = SmashStrBuf(&Buf);
273 recp->num_room = Cfg->roomlist_parts;
274 recp->recptypes_magic = RECPTYPES_MAGIC;
277 /* Construct a GUID to use in the S_USETABLE table.
278 * If one is not present in the item itself, make one up.
280 if (ri->guid != NULL) {
281 StrBufSpaceToBlank(ri->guid);
282 StrBufTrim(ri->guid);
283 guid = NewStrBufPlain(HKEY("rss/"));
284 StrBufAppendBuf(guid, ri->guid, 0);
287 MD5Init(&md5context);
288 if (ri->title != NULL) {
289 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
291 if (ri->link != NULL) {
292 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
294 MD5Final(rawdigest, &md5context);
295 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
296 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
297 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
300 /* translate Item into message. */
301 syslog(LOG_DEBUG, "RSS: translating item...\n");
302 if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
303 StrBufSpaceToBlank(ri->description);
304 msg = malloc(sizeof(struct CtdlMessage));
305 memset(msg, 0, sizeof(struct CtdlMessage));
306 msg->cm_magic = CTDLMESSAGE_MAGIC;
307 msg->cm_anon_type = MES_NORMAL;
308 msg->cm_format_type = FMT_RFC822;
310 if (ri->guid != NULL) {
311 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
314 if (ri->author_or_creator != NULL) {
316 StrBuf *Encoded = NULL;
319 From = html_to_ascii(ChrPtr(ri->author_or_creator),
320 StrLength(ri->author_or_creator),
322 StrBufPlain(ri->author_or_creator, From, -1);
323 StrBufTrim(ri->author_or_creator);
326 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
327 if (!FromAt && StrLength (ri->author_email) > 0)
329 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
330 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
331 msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
337 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
338 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
342 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
343 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
344 msg->cm_fields['P'] = strdup("rss@localhost");
347 if (ri->pubdate <= 0) {
348 ri->pubdate = time(NULL);
353 msg->cm_fields['A'] = strdup("rss");
356 msg->cm_fields['N'] = strdup(NODENAME);
357 if (ri->title != NULL) {
360 StrBuf *Encoded, *QPEncoded;
363 StrBufSpaceToBlank(ri->title);
364 len = StrLength(ri->title);
365 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
367 if (Sbj[len - 1] == '\n')
372 Encoded = NewStrBufPlain(Sbj, len);
376 StrBufRFC2047encode(&QPEncoded, Encoded);
378 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
379 FreeStrBuf(&Encoded);
381 msg->cm_fields['T'] = malloc(64);
382 snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
383 if (ri->channel_title != NULL) {
384 if (StrLength(ri->channel_title) > 0) {
385 msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
388 if (ri->link == NULL)
389 ri->link = NewStrBufPlain(HKEY(""));
391 #if 0 /* temporarily disable shorter urls. */
392 msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
395 msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
397 Message = NewStrBufPlain(NULL, StrLength(ri->description));
399 StrBufPlain(Message, HKEY(
400 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
402 #if 0 /* disable shorter url for now. */
403 msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
405 StrBufAppendBuf(Message, ri->description, 0);
406 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
408 AppendLink(Message, ri->link, ri->linkTitle, NULL);
409 AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
410 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
412 RSSQueueSaveMessage(msg, recp, guid, Message, Cfg);
420 int rss_do_fetching(rss_aggregator *Cfg)
429 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
433 ri = (rss_item*) malloc(sizeof(rss_item));
434 memset(ri, 0, sizeof(rss_item));
437 IO->CitContext = CloneContext(&rss_CC);
441 syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
442 ParseURL(&IO->ConnectMe, Cfg->Url, 80);
443 CurlPrepareURL(IO->ConnectMe);
445 if (! evcurl_init(IO,
448 "Citadel RSS Client",
450 RSSAggregatorTerminate))
452 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
456 evcurl_handle_start(IO);
461 void DeleteRssCfg(void *vptr)
463 rss_aggregator *rncptr = (rss_aggregator *)vptr;
465 FreeStrBuf(&rncptr->Url);
466 FreeStrBuf(&rncptr->rooms);
467 FreeStrBuf(&rncptr->CData);
468 FreeStrBuf(&rncptr->Key);
469 FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
470 DeleteHash(&rncptr->OtherQRnumbers);
471 FreeURL(&rncptr->IO.ConnectMe);
473 if (rncptr->Item != NULL)
475 FreeStrBuf(&rncptr->Item->guid);
476 FreeStrBuf(&rncptr->Item->title);
477 FreeStrBuf(&rncptr->Item->link);
478 FreeStrBuf(&rncptr->Item->linkTitle);
479 FreeStrBuf(&rncptr->Item->reLink);
480 FreeStrBuf(&rncptr->Item->reLinkTitle);
481 FreeStrBuf(&rncptr->Item->description);
482 FreeStrBuf(&rncptr->Item->channel_title);
483 FreeStrBuf(&rncptr->Item->author_or_creator);
484 FreeStrBuf(&rncptr->Item->author_url);
485 FreeStrBuf(&rncptr->Item->author_email);
492 eNextState RSSAggregatorTerminate(AsyncIO *IO)
494 rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
501 pthread_mutex_lock(&RSSQueueMutex);
503 if (rncptr->RefCount == 0)
505 UnlinkRSSAggregator(rncptr);
508 pthread_mutex_unlock(&RSSQueueMutex);
510 At = GetNewHashPos(RSSFetchUrls, 0);
512 pthread_mutex_lock(&RSSQueueMutex);
513 GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
514 GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
515 DeleteEntryFromHash(RSSFetchUrls, At);
516 pthread_mutex_unlock(&RSSQueueMutex);
524 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
526 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
528 StrBuf *CfgData=NULL;
531 rss_room_counter *Count = NULL;
533 char filename[PATH_MAX];
536 rss_aggregator *rncptr = NULL;
537 rss_aggregator *use_this_rncptr = NULL;
539 const char *CfgPtr, *lPtr;
542 pthread_mutex_lock(&RSSQueueMutex);
543 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
546 "rssclient: [%ld] %s already in progress.\n",
549 pthread_mutex_unlock(&RSSQueueMutex);
552 pthread_mutex_unlock(&RSSQueueMutex);
554 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
556 if (server_shutting_down)
559 /* Only do net processing for rooms that have netconfigs */
560 fd = open(filename, 0);
562 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
566 if (server_shutting_down)
569 if (fstat(fd, &statbuf) == -1) {
570 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
571 filename, strerror(errno));
575 if (server_shutting_down)
578 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
580 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
582 FreeStrBuf(&CfgData);
583 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
584 filename, strerror(errno));
588 if (server_shutting_down)
592 CfgType = NewStrBuf();
593 Line = NewStrBufPlain(NULL, StrLength(CfgData));
597 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
598 if (StrLength(Line) > 0)
601 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
602 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
606 Count = malloc(sizeof(rss_room_counter));
610 rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
611 memset (rncptr, 0, sizeof(rss_aggregator));
612 rncptr->roomlist_parts = 1;
613 rncptr->Url = NewStrBuf();
614 StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
616 pthread_mutex_lock(&RSSQueueMutex);
617 GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
618 use_this_rncptr = (rss_aggregator *)vptr;
619 if (use_this_rncptr != NULL)
621 /* mustn't attach to an active session */
622 if (use_this_rncptr->RefCount > 0)
624 DeleteRssCfg(rncptr);
630 StrBufAppendBufPlain(use_this_rncptr->rooms,
633 if (use_this_rncptr->roomlist_parts == 1)
635 use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
637 QRnumber = (long*)malloc(sizeof(long));
638 *QRnumber = qrbuf->QRnumber;
639 Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
640 use_this_rncptr->roomlist_parts++;
642 pthread_mutex_unlock(&RSSQueueMutex);
645 FreeStrBuf(&rncptr->Url);
650 pthread_mutex_unlock(&RSSQueueMutex);
652 rncptr->ItemType = RSS_UNSET;
654 rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
656 pthread_mutex_lock(&RSSQueueMutex);
657 Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
658 pthread_mutex_unlock(&RSSQueueMutex);
664 Count->QRnumber = qrbuf->QRnumber;
665 pthread_mutex_lock(&RSSQueueMutex);
666 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
667 qrbuf->QRnumber, qrbuf->QRname);
668 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
669 pthread_mutex_unlock(&RSSQueueMutex);
671 FreeStrBuf(&CfgData);
672 FreeStrBuf(&CfgType);
677 * Scan for rooms that have RSS client requests configured
679 void rssclient_scan(void) {
680 static int doing_rssclient = 0;
681 rss_aggregator *rptr = NULL;
687 /* Run no more than once every 15 minutes. * /
688 if ((time(NULL) - last_run) < 900) {
693 * This is a simple concurrency check to make sure only one rssclient run
694 * is done at a time. We could do this with a mutex, but since we
695 * don't really require extremely fine granularity here, we'll do it
696 * with a static variable instead.
698 if (doing_rssclient) return;
701 syslog(LOG_DEBUG, "rssclient started\n");
702 CtdlForEachRoom(rssclient_scan_room, NULL);
704 pthread_mutex_lock(&RSSQueueMutex);
706 it = GetNewHashPos(RSSFetchUrls, 0);
707 while (!server_shutting_down &&
708 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
710 rptr = (rss_aggregator *)vrptr;
711 if (rptr->RefCount == 0)
712 if (!rss_do_fetching(rptr))
713 UnlinkRSSAggregator(rptr);
716 pthread_mutex_unlock(&RSSQueueMutex);
718 syslog(LOG_DEBUG, "rssclient ended\n");
723 void rss_cleanup(void)
725 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
726 DeleteHash(&RSSFetchUrls);
727 DeleteHash(&RSSQueueRooms);
731 CTDL_MODULE_INIT(rssclient)
735 CtdlFillSystemContext(&rss_CC, "rssclient");
736 pthread_mutex_init(&RSSQueueMutex, NULL);
737 RSSQueueRooms = NewHash(1, lFlathash);
738 RSSFetchUrls = NewHash(1, NULL);
739 syslog(LOG_INFO, "%s\n", curl_version());
740 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
741 CtdlRegisterCleanupHook(rss_cleanup);