2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
66 struct rssnetcfg *rnclist = NULL;
67 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
69 if (StrLength(link) > 0)
71 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
72 StrBufAppendBuf(Message, link, 0);
73 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
74 if (StrLength(LinkTitle) > 0)
75 StrBufAppendBuf(Message, LinkTitle, 0);
76 else if ((Title != NULL) && !IsEmptyStr(Title))
77 StrBufAppendBufPlain(Message, Title, -1, 0);
79 StrBufAppendBuf(Message, link, 0);
80 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
83 typedef struct __networker_save_message {
85 struct CtdlMessage *Msg;
86 struct recptypes *recp;
90 } networker_save_message;
92 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
94 networker_save_message *Ctx = (networker_save_message *) IO->Data;
96 CtdlFreeMessage(Ctx->Msg);
97 free_recipients(Ctx->recp);
98 FreeStrBuf(&Ctx->MsgGUID);
103 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
105 return eAbort; ///TODO
108 eNextState RSSSaveMessage(AsyncIO *IO)
110 networker_save_message *Ctx = (networker_save_message *) IO->Data;
112 Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
114 CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
116 /* write the uidl to the use table so we don't store this item again */
117 cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
119 return eTerminateConnection;
122 // TODO: relink me: ExpandShortUrls(ri->description);
124 eNextState FetchNetworkUsetableEntry(AsyncIO *IO)
126 struct cdbdata *cdbut;
127 networker_save_message *Ctx = (networker_save_message *) IO->Data;
129 /* Find out if we've already seen this item */
130 strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
131 Ctx->ut.ut_timestamp = time(NULL);
133 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
136 /* Item has already been seen */
137 CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
140 /* rewrite the record anyway, to update the timestamp */
141 cdb_store(CDB_USETABLE,
143 &Ctx->ut, sizeof(struct UseTable) );
144 return eTerminateConnection;
149 NextDBOperation(IO, RSSSaveMessage);
153 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody)
155 networker_save_message *Ctx;
157 Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
158 memset(Ctx, 0, sizeof(networker_save_message));
160 Ctx->MsgGUID = MsgGUID;
161 Ctx->Message = MessageBody;
165 Ctx->IO.CitContext = CloneContext(CC);
166 Ctx->IO.Terminate = FreeNetworkSaveMessage;
167 Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
168 QueueDBOperation(&Ctx->IO, FetchNetworkUsetableEntry);
173 * Commit a fetched and parsed RSS item to disk
175 void rss_save_item(rss_item *ri)
178 struct MD5Context md5context;
179 u_char rawdigest[MD5_DIGEST_LEN];
180 struct CtdlMessage *msg;
181 struct recptypes *recp = NULL;
187 recp = (struct recptypes *) malloc(sizeof(struct recptypes));
188 if (recp == NULL) return;
189 memset(recp, 0, sizeof(struct recptypes));
190 Buf = NewStrBufDup(ri->roomlist);
191 recp->recp_room = SmashStrBuf(&Buf);
192 recp->num_room = ri->roomlist_parts;
193 recp->recptypes_magic = RECPTYPES_MAGIC;
195 /* Construct a GUID to use in the S_USETABLE table.
196 * If one is not present in the item itself, make one up.
198 if (ri->guid != NULL) {
199 StrBufSpaceToBlank(ri->guid);
200 StrBufTrim(ri->guid);
201 guid = NewStrBufPlain(HKEY("rss/"));
202 StrBufAppendBuf(guid, ri->guid, 0);
205 MD5Init(&md5context);
206 if (ri->title != NULL) {
207 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
209 if (ri->link != NULL) {
210 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
212 MD5Final(rawdigest, &md5context);
213 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
214 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
215 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
218 /* translate Item into message. */
219 CtdlLogPrintf(CTDL_DEBUG, "RSS: translating item...\n");
220 if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
221 StrBufSpaceToBlank(ri->description);
222 msg = malloc(sizeof(struct CtdlMessage));
223 memset(msg, 0, sizeof(struct CtdlMessage));
224 msg->cm_magic = CTDLMESSAGE_MAGIC;
225 msg->cm_anon_type = MES_NORMAL;
226 msg->cm_format_type = FMT_RFC822;
228 if (ri->guid != NULL) {
229 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
232 if (ri->author_or_creator != NULL) {
234 StrBuf *Encoded = NULL;
237 From = html_to_ascii(ChrPtr(ri->author_or_creator),
238 StrLength(ri->author_or_creator),
240 StrBufPlain(ri->author_or_creator, From, -1);
241 StrBufTrim(ri->author_or_creator);
244 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
245 if (!FromAt && StrLength (ri->author_email) > 0)
247 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
248 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
249 msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
254 msg->cm_fields['P'] = SmashStrBuf(&ri->author_or_creator);
257 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
258 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
259 msg->cm_fields['P'] = strdup("rss@localhost");
264 msg->cm_fields['A'] = strdup("rss");
267 msg->cm_fields['N'] = strdup(NODENAME);
268 if (ri->title != NULL) {
271 StrBuf *Encoded, *QPEncoded;
274 StrBufSpaceToBlank(ri->title);
275 len = StrLength(ri->title);
276 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
278 if (Sbj[len - 1] == '\n')
283 Encoded = NewStrBufPlain(Sbj, len);
287 StrBufRFC2047encode(&QPEncoded, Encoded);
289 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
290 FreeStrBuf(&Encoded);
292 msg->cm_fields['T'] = malloc(64);
293 snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
294 if (ri->channel_title != NULL) {
295 if (StrLength(ri->channel_title) > 0) {
296 msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
299 if (ri->link == NULL)
300 ri->link = NewStrBufPlain(HKEY(""));
302 #if 0 /* temporarily disable shorter urls. */
303 msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
306 msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
308 Message = NewStrBufPlain(NULL, StrLength(ri->description));
310 StrBufPlain(Message, HKEY(
311 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
313 #if 0 /* disable shorter url for now. */
314 msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
316 StrBufAppendBuf(Message, ri->description, 0);
317 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
319 AppendLink(Message, ri->link, ri->linkTitle, NULL);
320 AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
321 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
323 RSSQueueSaveMessage(msg, recp, guid, Message);
331 void rss_do_fetching(rssnetcfg *Cfg) {
340 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
344 ri = (rss_item*) malloc(sizeof(rss_item));
345 rssc = (rsscollection*) malloc(sizeof(rsscollection));
346 memset(ri, 0, sizeof(rss_item));
347 memset(rssc, 0, sizeof(rsscollection));
351 IO->CitContext = CloneContext(CC);
353 ri->roomlist = Cfg->rooms;
356 CtdlLogPrintf(CTDL_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
357 ParseURL(&IO->ConnectMe, Cfg->Url, 80);
358 CurlPrepareURL(IO->ConnectMe);
360 if (! evcurl_init(IO,
363 "Citadel RSS Client",
366 CtdlLogPrintf(CTDL_ALERT, "Unable to initialize libcurl.\n");
370 evcurl_handle_start(IO);
373 citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
374 HashList *RSSQueueRooms = NULL;
375 HashList *RSSFetchUrls = NULL;
379 while (fgets(buf, sizeof buf, fp) != NULL && !CtdlThreadCheckStop()) {
380 buf[strlen(buf)-1] = 0;
382 extract_token(instr, buf, 0, '|', sizeof instr);
383 if (!strcasecmp(instr, "rssclient")) {
385 use_this_rncptr = NULL;
387 extract_token(feedurl, buf, 1, '|', sizeof feedurl);
389 /* If any other rooms have requested the same feed, then we will just add this
390 * room to the target list for that client request.
391 * / TODO: how do we do this best?
392 for (rncptr=rnclist; rncptr!=NULL; rncptr=rncptr->next) {
393 if (!strcmp(ChrPtr(rncptr->Url), feedurl)) {
394 use_this_rncptr = rncptr;
398 /* Otherwise create a new client request * /
399 if (use_this_rncptr == NULL) {
400 rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
401 memset(rncptr, 0, sizeof(rssnetcfg));
402 rncptr->ItemType = RSS_UNSET;
404 rncptr->Url = NewStrBufPlain(feedurl, -1);
405 rncptr->rooms = NULL;
407 use_this_rncptr = rncptr;
411 /* Add the room name to the request * /
412 if (use_this_rncptr != NULL) {
413 if (use_this_rncptr->rooms == NULL) {
414 rncptr->rooms = strdup(qrbuf->QRname);
417 len = strlen(use_this_rncptr->rooms) + strlen(qrbuf->QRname) + 5;
418 ptr = realloc(use_this_rncptr->rooms, len);
421 strcat(ptr, qrbuf->QRname);
422 use_this_rncptr->rooms = ptr;
430 typedef struct __RoomCounter {
437 void DeleteRssCfg(void *vptr)
439 rssnetcfg *rncptr = (rssnetcfg *)vptr;
441 FreeStrBuf(&rncptr->Url);
442 FreeStrBuf(&rncptr->rooms);
448 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
450 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
455 RoomCounter *Count = NULL;
457 char filename[PATH_MAX];
463 rssnetcfg *rncptr = NULL;
464 rssnetcfg *use_this_rncptr = NULL;
468 const char *CfgPtr, *lPtr;
471 citthread_mutex_lock(&RSSQueueMutex);
472 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
474 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s already in progress.\n", qrbuf->QRname);
475 citthread_mutex_unlock(&RSSQueueMutex);
478 citthread_mutex_unlock(&RSSQueueMutex);
480 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
482 if (CtdlThreadCheckStop())
485 /* Only do net processing for rooms that have netconfigs */
486 fd = open(filename, 0);
488 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
491 if (CtdlThreadCheckStop())
493 if (fstat(fd, &statbuf) == -1) {
494 CtdlLogPrintf(CTDL_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
495 filename, strerror(errno));
498 if (CtdlThreadCheckStop())
500 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
501 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
503 FreeStrBuf(&CfgData);
504 CtdlLogPrintf(CTDL_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
505 filename, strerror(errno));
509 if (CtdlThreadCheckStop())
513 CfgType = NewStrBuf();
514 Line = NewStrBufPlain(NULL, StrLength(CfgData));
518 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
519 if (StrLength(Line) > 0)
522 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
523 if (!strcmp("rssclient", ChrPtr(CfgType)))
527 Count = malloc(sizeof(RoomCounter));
531 rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
532 memset (rncptr, 0, sizeof(rssnetcfg));
533 rncptr->roomlist_parts = 1;
534 rncptr->Url = NewStrBuf();
535 StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
537 citthread_mutex_lock(&RSSQueueMutex);
538 GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
539 use_this_rncptr = (rssnetcfg *)vptr;
540 citthread_mutex_unlock(&RSSQueueMutex);
542 if (use_this_rncptr != NULL)
544 /* mustn't attach to an active session */
545 if (use_this_rncptr->Attached == 1)
547 DeleteRssCfg(rncptr);
551 StrBufAppendBufPlain(use_this_rncptr->rooms,
554 use_this_rncptr->roomlist_parts++;
560 rncptr->ItemType = RSS_UNSET;
562 rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
564 citthread_mutex_lock(&RSSQueueMutex);
565 Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
566 citthread_mutex_unlock(&RSSQueueMutex);
572 Count->QRnumber = qrbuf->QRnumber;
573 citthread_mutex_lock(&RSSQueueMutex);
574 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
575 citthread_mutex_unlock(&RSSQueueMutex);
577 FreeStrBuf(&CfgData);
578 FreeStrBuf(&CfgType);
583 * Scan for rooms that have RSS client requests configured
585 void rssclient_scan(void) {
586 static int doing_rssclient = 0;
587 rssnetcfg *rptr = NULL;
594 * This is a simple concurrency check to make sure only one rssclient run
595 * is done at a time. We could do this with a mutex, but since we
596 * don't really require extremely fine granularity here, we'll do it
597 * with a static variable instead.
599 if (doing_rssclient) return;
602 CtdlLogPrintf(CTDL_DEBUG, "rssclient started\n");
603 CtdlForEachRoom(rssclient_scan_room, NULL);
605 citthread_mutex_lock(&RSSQueueMutex);
607 it = GetNewHashPos(RSSQueueRooms, 0);
608 while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
610 rptr = (rssnetcfg *)vrptr;
611 if (!rptr->Attached) rss_do_fetching(rptr);
614 citthread_mutex_unlock(&RSSQueueMutex);
616 CtdlLogPrintf(CTDL_DEBUG, "rssclientscheduler ended\n");
621 void RSSCleanup(void)
623 citthread_mutex_destroy(&RSSQueueMutex);
624 DeleteHash(&RSSFetchUrls);
625 DeleteHash(&RSSQueueRooms);
629 CTDL_MODULE_INIT(rssclient)
633 citthread_mutex_init(&RSSQueueMutex, NULL);
634 RSSQueueRooms = NewHash(1, Flathash);
635 RSSFetchUrls = NewHash(1, NULL);
636 CtdlLogPrintf(CTDL_INFO, "%s\n", curl_version());
637 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);