2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
66 struct rssnetcfg *rnclist = NULL;
67 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
69 if (StrLength(link) > 0)
71 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
72 StrBufAppendBuf(Message, link, 0);
73 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
74 if (StrLength(LinkTitle) > 0)
75 StrBufAppendBuf(Message, LinkTitle, 0);
76 else if ((Title != NULL) && !IsEmptyStr(Title))
77 StrBufAppendBufPlain(Message, Title, -1, 0);
79 StrBufAppendBuf(Message, link, 0);
80 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
83 typedef struct __networker_save_message {
85 struct CtdlMessage *Msg;
86 struct recptypes *recp;
90 } networker_save_message;
92 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
94 networker_save_message *Ctx = (networker_save_message *) IO->Data;
96 CtdlFreeMessage(Ctx->Msg);
97 free_recipients(Ctx->recp);
98 FreeStrBuf(&Ctx->MsgGUID);
103 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
105 return eAbort; ///TODO
108 eNextState RSSSaveMessage(AsyncIO *IO)
110 networker_save_message *Ctx = (networker_save_message *) IO->Data;
112 Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
114 CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
116 /* write the uidl to the use table so we don't store this item again */
117 cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
119 return eTerminateConnection;
122 // TODO: relink me: ExpandShortUrls(ri->description);
124 eNextState FetchNetworkUsetableEntry(AsyncIO *IO)
126 struct cdbdata *cdbut;
127 networker_save_message *Ctx = (networker_save_message *) IO->Data;
129 /* Find out if we've already seen this item */
130 strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
131 Ctx->ut.ut_timestamp = time(NULL);
133 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
136 /* Item has already been seen */
137 CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
140 /* rewrite the record anyway, to update the timestamp */
141 cdb_store(CDB_USETABLE,
143 &Ctx->ut, sizeof(struct UseTable) );
144 return eTerminateConnection;
149 NextDBOperation(IO, RSSSaveMessage);
153 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody)
155 networker_save_message *Ctx;
157 Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
158 memset(Ctx, 0, sizeof(networker_save_message));
160 Ctx->MsgGUID = MsgGUID;
161 Ctx->Message = MessageBody;
165 Ctx->IO.CitContext = CloneContext(CC);
166 Ctx->IO.Terminate = FreeNetworkSaveMessage;
167 Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
168 QueueDBOperation(&Ctx->IO, FetchNetworkUsetableEntry);
173 * Commit a fetched and parsed RSS item to disk
175 void rss_save_item(rss_item *ri)
178 struct MD5Context md5context;
179 u_char rawdigest[MD5_DIGEST_LEN];
180 struct CtdlMessage *msg;
181 struct recptypes *recp = NULL;
186 recp = (struct recptypes *) malloc(sizeof(struct recptypes));
187 if (recp == NULL) return;
188 memset(recp, 0, sizeof(struct recptypes));
189 recp->recp_room = strdup(ri->roomlist);
190 recp->num_room = num_tokens(ri->roomlist, '|');
191 recp->recptypes_magic = RECPTYPES_MAGIC;
193 /* Construct a GUID to use in the S_USETABLE table.
194 * If one is not present in the item itself, make one up.
196 if (ri->guid != NULL) {
197 StrBufSpaceToBlank(ri->guid);
198 StrBufTrim(ri->guid);
199 guid = NewStrBufPlain(HKEY("rss/"));
200 StrBufAppendBuf(guid, ri->guid, 0);
203 MD5Init(&md5context);
204 if (ri->title != NULL) {
205 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
207 if (ri->link != NULL) {
208 MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
210 MD5Final(rawdigest, &md5context);
211 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
212 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
213 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
216 /* translate Item into message. */
217 CtdlLogPrintf(CTDL_DEBUG, "RSS: translating item...\n");
218 if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
219 StrBufSpaceToBlank(ri->description);
220 msg = malloc(sizeof(struct CtdlMessage));
221 memset(msg, 0, sizeof(struct CtdlMessage));
222 msg->cm_magic = CTDLMESSAGE_MAGIC;
223 msg->cm_anon_type = MES_NORMAL;
224 msg->cm_format_type = FMT_RFC822;
226 if (ri->guid != NULL) {
227 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
230 if (ri->author_or_creator != NULL) {
232 StrBuf *Encoded = NULL;
235 From = html_to_ascii(ChrPtr(ri->author_or_creator),
236 StrLength(ri->author_or_creator),
238 StrBufPlain(ri->author_or_creator, From, -1);
239 StrBufTrim(ri->author_or_creator);
242 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
243 if (!FromAt && StrLength (ri->author_email) > 0)
245 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
246 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
247 msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
252 msg->cm_fields['P'] = SmashStrBuf(&ri->author_or_creator);
255 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
256 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
257 msg->cm_fields['P'] = strdup("rss@localhost");
262 msg->cm_fields['A'] = strdup("rss");
265 msg->cm_fields['N'] = strdup(NODENAME);
266 if (ri->title != NULL) {
269 StrBuf *Encoded, *QPEncoded;
272 StrBufSpaceToBlank(ri->title);
273 len = StrLength(ri->title);
274 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
276 if (Sbj[len - 1] == '\n')
281 Encoded = NewStrBufPlain(Sbj, len);
285 StrBufRFC2047encode(&QPEncoded, Encoded);
287 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
288 FreeStrBuf(&Encoded);
290 msg->cm_fields['T'] = malloc(64);
291 snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
292 if (ri->channel_title != NULL) {
293 if (StrLength(ri->channel_title) > 0) {
294 msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
297 if (ri->link == NULL)
298 ri->link = NewStrBufPlain(HKEY(""));
300 #if 0 /* temporarily disable shorter urls. */
301 msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
304 msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
306 Message = NewStrBufPlain(NULL, StrLength(ri->description));
308 StrBufPlain(Message, HKEY(
309 "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
311 #if 0 /* disable shorter url for now. */
312 msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
314 StrBufAppendBuf(Message, ri->description, 0);
315 StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
317 AppendLink(Message, ri->link, ri->linkTitle, NULL);
318 AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
319 StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
321 RSSQueueSaveMessage(msg, recp, guid, Message);
329 void rss_do_fetching(rssnetcfg *Cfg) {
338 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
342 ri = (rss_item*) malloc(sizeof(rss_item));
343 rssc = (rsscollection*) malloc(sizeof(rsscollection));
344 memset(ri, 0, sizeof(rss_item));
345 memset(rssc, 0, sizeof(rsscollection));
349 IO->CitContext = CloneContext(CC);
351 ri->roomlist = Cfg->rooms;
354 CtdlLogPrintf(CTDL_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
355 ParseURL(&IO->ConnectMe, Cfg->Url, 80);
356 CurlPrepareURL(IO->ConnectMe);
358 if (! evcurl_init(IO,
361 "Citadel RSS Client",
364 CtdlLogPrintf(CTDL_ALERT, "Unable to initialize libcurl.\n");
368 evcurl_handle_start(IO);
371 citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
372 HashList *RSSQueueRooms = NULL;
373 HashList *RSSFetchUrls = NULL;
377 while (fgets(buf, sizeof buf, fp) != NULL && !CtdlThreadCheckStop()) {
378 buf[strlen(buf)-1] = 0;
380 extract_token(instr, buf, 0, '|', sizeof instr);
381 if (!strcasecmp(instr, "rssclient")) {
383 use_this_rncptr = NULL;
385 extract_token(feedurl, buf, 1, '|', sizeof feedurl);
387 /* If any other rooms have requested the same feed, then we will just add this
388 * room to the target list for that client request.
389 * / TODO: how do we do this best?
390 for (rncptr=rnclist; rncptr!=NULL; rncptr=rncptr->next) {
391 if (!strcmp(ChrPtr(rncptr->Url), feedurl)) {
392 use_this_rncptr = rncptr;
396 /* Otherwise create a new client request * /
397 if (use_this_rncptr == NULL) {
398 rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
399 memset(rncptr, 0, sizeof(rssnetcfg));
400 rncptr->ItemType = RSS_UNSET;
402 rncptr->Url = NewStrBufPlain(feedurl, -1);
403 rncptr->rooms = NULL;
405 use_this_rncptr = rncptr;
409 /* Add the room name to the request * /
410 if (use_this_rncptr != NULL) {
411 if (use_this_rncptr->rooms == NULL) {
412 rncptr->rooms = strdup(qrbuf->QRname);
415 len = strlen(use_this_rncptr->rooms) + strlen(qrbuf->QRname) + 5;
416 ptr = realloc(use_this_rncptr->rooms, len);
419 strcat(ptr, qrbuf->QRname);
420 use_this_rncptr->rooms = ptr;
428 typedef struct __RoomCounter {
435 void DeleteRssCfg(void *vptr)
437 rssnetcfg *rncptr = (rssnetcfg *)vptr;
439 FreeStrBuf(&rncptr->Url);
440 if (rncptr->rooms != NULL) free(rncptr->rooms);
446 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
448 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
453 RoomCounter *Count = NULL;
455 char filename[PATH_MAX];
461 rssnetcfg *rncptr = NULL;
462 rssnetcfg *use_this_rncptr = NULL;
466 const char *CfgPtr, *lPtr;
469 citthread_mutex_lock(&RSSQueueMutex);
470 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
472 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s already in progress.\n", qrbuf->QRname);
473 citthread_mutex_unlock(&RSSQueueMutex);
476 citthread_mutex_unlock(&RSSQueueMutex);
478 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
480 if (CtdlThreadCheckStop())
483 /* Only do net processing for rooms that have netconfigs */
484 fd = open(filename, 0);
486 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
489 if (CtdlThreadCheckStop())
491 if (fstat(fd, &statbuf) == -1) {
492 CtdlLogPrintf(CTDL_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
493 filename, strerror(errno));
496 if (CtdlThreadCheckStop())
498 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
499 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
501 FreeStrBuf(&CfgData);
502 CtdlLogPrintf(CTDL_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
503 filename, strerror(errno));
507 if (CtdlThreadCheckStop())
511 CfgType = NewStrBuf();
512 Line = NewStrBufPlain(NULL, StrLength(CfgData));
516 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
517 if (StrLength(Line) > 0)
520 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
521 if (!strcmp("rssclient", ChrPtr(CfgType)))
525 Count = malloc(sizeof(RoomCounter));
529 rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
530 memset (rncptr, 0, sizeof(rssnetcfg));
531 rncptr->Url = NewStrBuf();
532 StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
534 citthread_mutex_lock(&RSSQueueMutex);
535 GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
536 use_this_rncptr = (rssnetcfg *)vptr;
537 citthread_mutex_unlock(&RSSQueueMutex);
539 if (use_this_rncptr != NULL)
541 /* mustn't attach to an active session */
542 if (use_this_rncptr->Attached == 1)
544 DeleteRssCfg(rncptr);
548 /* TODO: hook us into the otherone here. */
554 rncptr->ItemType = RSS_UNSET;
556 rncptr->rooms = NULL;
557 rncptr->rooms = strdup(qrbuf->QRname);
559 citthread_mutex_lock(&RSSQueueMutex);
560 Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
561 citthread_mutex_unlock(&RSSQueueMutex);
567 Count->QRnumber = qrbuf->QRnumber;
568 citthread_mutex_lock(&RSSQueueMutex);
569 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
570 citthread_mutex_unlock(&RSSQueueMutex);
572 FreeStrBuf(&CfgData);
573 FreeStrBuf(&CfgType);
578 * Scan for rooms that have RSS client requests configured
580 void rssclient_scan(void) {
581 static int doing_rssclient = 0;
582 rssnetcfg *rptr = NULL;
589 * This is a simple concurrency check to make sure only one rssclient run
590 * is done at a time. We could do this with a mutex, but since we
591 * don't really require extremely fine granularity here, we'll do it
592 * with a static variable instead.
594 if (doing_rssclient) return;
597 CtdlLogPrintf(CTDL_DEBUG, "rssclient started\n");
598 CtdlForEachRoom(rssclient_scan_room, NULL);
600 citthread_mutex_lock(&RSSQueueMutex);
602 it = GetNewHashPos(RSSQueueRooms, 0);
603 while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
605 rptr = (rssnetcfg *)vrptr;
606 if (!rptr->Attached) rss_do_fetching(rptr);
609 citthread_mutex_unlock(&RSSQueueMutex);
611 CtdlLogPrintf(CTDL_DEBUG, "rssclientscheduler ended\n");
616 void RSSCleanup(void)
618 citthread_mutex_destroy(&RSSQueueMutex);
619 DeleteHash(&RSSFetchUrls);
620 DeleteHash(&RSSQueueRooms);
624 CTDL_MODULE_INIT(rssclient)
628 citthread_mutex_init(&RSSQueueMutex, NULL);
629 RSSQueueRooms = NewHash(1, Flathash);
630 RSSFetchUrls = NewHash(1, NULL);
631 CtdlLogPrintf(CTDL_INFO, "%s\n", curl_version());
632 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);