first working RSS collection with async DB-Saves
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65
66 struct rssnetcfg *rnclist = NULL;
67 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
68 {
69         if (StrLength(link) > 0)
70         {
71                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
72                 StrBufAppendBuf(Message, link, 0);
73                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
74                 if (StrLength(LinkTitle) > 0)
75                         StrBufAppendBuf(Message, LinkTitle, 0);
76                 else if ((Title != NULL) && !IsEmptyStr(Title))
77                         StrBufAppendBufPlain(Message, Title, -1, 0);
78                 else
79                         StrBufAppendBuf(Message, link, 0);
80                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
81         }
82 }
83 typedef struct __networker_save_message {
84         AsyncIO IO;
85         struct CtdlMessage *Msg;
86         struct recptypes *recp;
87         StrBuf *MsgGUID;
88         StrBuf *Message;
89         struct UseTable ut;
90 } networker_save_message;
91
92 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
93 {
94         networker_save_message *Ctx = (networker_save_message *) IO->Data;
95
96         CtdlFreeMessage(Ctx->Msg);
97         free_recipients(Ctx->recp);
98         FreeStrBuf(&Ctx->MsgGUID);
99         free(Ctx);
100         return eAbort;
101 }
102
103 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
104 {
105     return eAbort; ///TODO
106 }
107
108 eNextState RSSSaveMessage(AsyncIO *IO)
109 {
110         networker_save_message *Ctx = (networker_save_message *) IO->Data;
111
112         Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
113
114         CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
115
116         /* write the uidl to the use table so we don't store this item again */
117         cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
118
119         return eTerminateConnection;
120 }
121
122 // TODO: relink me:     ExpandShortUrls(ri->description);
123
124 eNextState FetchNetworkUsetableEntry(AsyncIO *IO)
125 {
126         struct cdbdata *cdbut;
127         networker_save_message *Ctx = (networker_save_message *) IO->Data;
128
129         /* Find out if we've already seen this item */
130         strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
131         Ctx->ut.ut_timestamp = time(NULL);
132
133         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
134 #ifndef DEBUG_RSS
135         if (cdbut != NULL) {
136                 /* Item has already been seen */
137                 CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
138                 cdb_free(cdbut);
139
140                 /* rewrite the record anyway, to update the timestamp */
141                 cdb_store(CDB_USETABLE, 
142                           SKEY(Ctx->MsgGUID), 
143                           &Ctx->ut, sizeof(struct UseTable) );
144                 return eTerminateConnection;
145         }
146         else
147 #endif
148         {
149                 NextDBOperation(IO, RSSSaveMessage);
150                 return eSendMore;
151         }
152 }
153 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody)
154 {
155         networker_save_message *Ctx;
156
157         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
158         memset(Ctx, 0, sizeof(networker_save_message));
159         
160         Ctx->MsgGUID = MsgGUID;
161         Ctx->Message = MessageBody;
162         Ctx->Msg = Msg;
163         Ctx->recp = recp;
164         Ctx->IO.Data = Ctx;
165         Ctx->IO.CitContext = CloneContext(CC);
166         Ctx->IO.Terminate = FreeNetworkSaveMessage;
167         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
168         QueueDBOperation(&Ctx->IO, FetchNetworkUsetableEntry);
169 }
170
171
172 /*
173  * Commit a fetched and parsed RSS item to disk
174  */
175 void rss_save_item(rss_item *ri)
176 {
177
178         struct MD5Context md5context;
179         u_char rawdigest[MD5_DIGEST_LEN];
180         struct CtdlMessage *msg;
181         struct recptypes *recp = NULL;
182         int msglen = 0;
183         StrBuf *Message;
184         StrBuf *guid;
185
186         recp = (struct recptypes *) malloc(sizeof(struct recptypes));
187         if (recp == NULL) return;
188         memset(recp, 0, sizeof(struct recptypes));
189         recp->recp_room = strdup(ri->roomlist);
190         recp->num_room = num_tokens(ri->roomlist, '|');
191         recp->recptypes_magic = RECPTYPES_MAGIC;
192    
193         /* Construct a GUID to use in the S_USETABLE table.
194          * If one is not present in the item itself, make one up.
195          */
196         if (ri->guid != NULL) {
197                 StrBufSpaceToBlank(ri->guid);
198                 StrBufTrim(ri->guid);
199                 guid = NewStrBufPlain(HKEY("rss/"));
200                 StrBufAppendBuf(guid, ri->guid, 0);
201         }
202         else {
203                 MD5Init(&md5context);
204                 if (ri->title != NULL) {
205                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
206                 }
207                 if (ri->link != NULL) {
208                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
209                 }
210                 MD5Final(rawdigest, &md5context);
211                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
212                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
213                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
214         }
215
216         /* translate Item into message. */
217         CtdlLogPrintf(CTDL_DEBUG, "RSS: translating item...\n");
218         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
219         StrBufSpaceToBlank(ri->description);
220         msg = malloc(sizeof(struct CtdlMessage));
221         memset(msg, 0, sizeof(struct CtdlMessage));
222         msg->cm_magic = CTDLMESSAGE_MAGIC;
223         msg->cm_anon_type = MES_NORMAL;
224         msg->cm_format_type = FMT_RFC822;
225
226         if (ri->guid != NULL) {
227                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
228         }
229
230         if (ri->author_or_creator != NULL) {
231                 char *From;
232                 StrBuf *Encoded = NULL;
233                 int FromAt;
234                         
235                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
236                                      StrLength(ri->author_or_creator), 
237                                      512, 0);
238                 StrBufPlain(ri->author_or_creator, From, -1);
239                 StrBufTrim(ri->author_or_creator);
240                 free(From);
241
242                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
243                 if (!FromAt && StrLength (ri->author_email) > 0)
244                 {
245                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
246                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
247                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
248                 }
249                 else
250                 {
251                         if (FromAt)
252                                 msg->cm_fields['P'] = SmashStrBuf(&ri->author_or_creator);
253                         else 
254                         {
255                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
256                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
257                                 msg->cm_fields['P'] = strdup("rss@localhost");
258                         }
259                 }
260         }
261         else {
262                 msg->cm_fields['A'] = strdup("rss");
263         }
264
265         msg->cm_fields['N'] = strdup(NODENAME);
266         if (ri->title != NULL) {
267                 long len;
268                 char *Sbj;
269                 StrBuf *Encoded, *QPEncoded;
270
271                 QPEncoded = NULL;
272                 StrBufSpaceToBlank(ri->title);
273                 len = StrLength(ri->title);
274                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
275                 len = strlen(Sbj);
276                 if (Sbj[len - 1] == '\n')
277                 {
278                         len --;
279                         Sbj[len] = '\0';
280                 }
281                 Encoded = NewStrBufPlain(Sbj, len);
282                 free(Sbj);
283
284                 StrBufTrim(Encoded);
285                 StrBufRFC2047encode(&QPEncoded, Encoded);
286
287                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
288                 FreeStrBuf(&Encoded);
289         }
290         msg->cm_fields['T'] = malloc(64);
291         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
292         if (ri->channel_title != NULL) {
293                 if (StrLength(ri->channel_title) > 0) {
294                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
295                 }
296         }
297         if (ri->link == NULL) 
298                 ri->link = NewStrBufPlain(HKEY(""));
299
300 #if 0 /* temporarily disable shorter urls. */
301         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
302 #endif
303
304         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
305
306         Message = NewStrBufPlain(NULL, StrLength(ri->description));
307
308         StrBufPlain(Message, HKEY(
309                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
310                             "<html><body>\n"));
311 #if 0 /* disable shorter url for now. */
312         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
313 #endif
314         StrBufAppendBuf(Message, ri->description, 0);
315         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
316
317         AppendLink(Message, ri->link, ri->linkTitle, NULL);
318         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
319         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
320
321         RSSQueueSaveMessage(msg, recp, guid, Message);
322 }
323
324
325
326 /*
327  * Begin a feed parse
328  */
329 void rss_do_fetching(rssnetcfg *Cfg) {
330         rsscollection *rssc;
331         rss_item *ri;
332                 
333         time_t now;
334         AsyncIO *IO;
335
336         now = time(NULL);
337
338         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
339                 return;
340         Cfg->Attached = 1;
341
342         ri = (rss_item*) malloc(sizeof(rss_item));
343         rssc = (rsscollection*) malloc(sizeof(rsscollection));
344         memset(ri, 0, sizeof(rss_item));
345         memset(rssc, 0, sizeof(rsscollection));
346         rssc->Item = ri;
347         rssc->Cfg = Cfg;
348         IO = &rssc->IO;
349         IO->CitContext = CloneContext(CC);
350         IO->Data = rssc;
351         ri->roomlist = Cfg->rooms;
352
353
354         CtdlLogPrintf(CTDL_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
355         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
356         CurlPrepareURL(IO->ConnectMe);
357
358         if (! evcurl_init(IO, 
359 //                        Ctx, 
360                           NULL,
361                           "Citadel RSS Client",
362                           ParseRSSReply))
363         {
364                 CtdlLogPrintf(CTDL_ALERT, "Unable to initialize libcurl.\n");
365 //              goto abort;
366         }
367
368         evcurl_handle_start(IO);
369 }
370
371 citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
372 HashList *RSSQueueRooms = NULL;
373 HashList *RSSFetchUrls = NULL;
374
375
376 /*
377         while (fgets(buf, sizeof buf, fp) != NULL && !CtdlThreadCheckStop()) {
378                 buf[strlen(buf)-1] = 0;
379
380                 extract_token(instr, buf, 0, '|', sizeof instr);
381                 if (!strcasecmp(instr, "rssclient")) {
382
383                         use_this_rncptr = NULL;
384
385                         extract_token(feedurl, buf, 1, '|', sizeof feedurl);
386
387                         /* If any other rooms have requested the same feed, then we will just add this
388                          * room to the target list for that client request.
389                          * / TODO: how do we do this best?
390                         for (rncptr=rnclist; rncptr!=NULL; rncptr=rncptr->next) {
391                                 if (!strcmp(ChrPtr(rncptr->Url), feedurl)) {
392                                         use_this_rncptr = rncptr;
393                                 }
394                         }
395                         * /
396                         /* Otherwise create a new client request * /
397                         if (use_this_rncptr == NULL) {
398                                 rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
399                                 memset(rncptr, 0, sizeof(rssnetcfg));
400                                 rncptr->ItemType = RSS_UNSET;
401
402                                 rncptr->Url = NewStrBufPlain(feedurl, -1);
403                                 rncptr->rooms = NULL;
404                                 rnclist = rncptr;
405                                 use_this_rncptr = rncptr;
406
407                         }
408
409                         /* Add the room name to the request * /
410                         if (use_this_rncptr != NULL) {
411                                 if (use_this_rncptr->rooms == NULL) {
412                                         rncptr->rooms = strdup(qrbuf->QRname);
413                                 }
414                                 else {
415                                         len = strlen(use_this_rncptr->rooms) + strlen(qrbuf->QRname) + 5;
416                                         ptr = realloc(use_this_rncptr->rooms, len);
417                                         if (ptr != NULL) {
418                                                 strcat(ptr, "|");
419                                                 strcat(ptr, qrbuf->QRname);
420                                                 use_this_rncptr->rooms = ptr;
421                                         }
422                                 }
423                         }
424                 }
425
426         }
427                         */
428 typedef struct __RoomCounter {
429         int count;
430         long QRnumber;
431 } RoomCounter;
432
433
434
435 void DeleteRssCfg(void *vptr)
436 {
437         rssnetcfg *rncptr = (rssnetcfg *)vptr;
438
439         FreeStrBuf(&rncptr->Url);
440         if (rncptr->rooms != NULL) free(rncptr->rooms);
441         free(rncptr);
442 }
443
444
445 /*
446  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
447  */
448 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
449 {
450         StrBuf *CfgData;
451         StrBuf *CfgType;
452         StrBuf *Line;
453         RoomCounter *Count = NULL;
454         struct stat statbuf;
455         char filename[PATH_MAX];
456         //char buf[1024];
457         //char instr[32];
458         int  fd;
459         int Done;
460         //char feedurl[256];
461         rssnetcfg *rncptr = NULL;
462         rssnetcfg *use_this_rncptr = NULL;
463         //int len = 0;
464         //char *ptr = NULL;
465         void *vptr;
466         const char *CfgPtr, *lPtr;
467         const char *Err;
468
469         citthread_mutex_lock(&RSSQueueMutex);
470         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
471         {
472                 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s already in progress.\n", qrbuf->QRname);
473                 citthread_mutex_unlock(&RSSQueueMutex);
474                 return;
475         }
476         citthread_mutex_unlock(&RSSQueueMutex);
477
478         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
479
480         if (CtdlThreadCheckStop())
481                 return;
482                 
483         /* Only do net processing for rooms that have netconfigs */
484         fd = open(filename, 0);
485         if (fd <= 0) {
486                 //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
487                 return;
488         }
489         if (CtdlThreadCheckStop())
490                 return;
491         if (fstat(fd, &statbuf) == -1) {
492                 CtdlLogPrintf(CTDL_DEBUG,  "ERROR: could not stat configfile '%s' - %s\n",
493                         filename, strerror(errno));
494                 return;
495         }
496         if (CtdlThreadCheckStop())
497                 return;
498         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
499         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
500                 close(fd);
501                 FreeStrBuf(&CfgData);
502                 CtdlLogPrintf(CTDL_DEBUG,  "ERROR: reading config '%s' - %s<br>\n",
503                         filename, strerror(errno));
504                 return;
505         }
506         close(fd);
507         if (CtdlThreadCheckStop())
508                 return;
509         
510         CfgPtr = NULL;
511         CfgType = NewStrBuf();
512         Line = NewStrBufPlain(NULL, StrLength(CfgData));
513         Done = 0;
514         while (!Done)
515         {
516             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
517             if (StrLength(Line) > 0)
518             {
519                 lPtr = NULL;
520                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
521                 if (!strcmp("rssclient", ChrPtr(CfgType)))
522                 {
523                     if (Count == NULL)
524                     {
525                         Count = malloc(sizeof(RoomCounter));
526                         Count->count = 0;
527                     }
528                     Count->count ++;
529                     rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg));
530                     memset (rncptr, 0, sizeof(rssnetcfg));
531                     rncptr->Url = NewStrBuf();
532                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
533
534                     citthread_mutex_lock(&RSSQueueMutex);
535                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
536                     use_this_rncptr = (rssnetcfg *)vptr;
537                     citthread_mutex_unlock(&RSSQueueMutex);
538
539                     if (use_this_rncptr != NULL)
540                     {
541                         /* mustn't attach to an active session */
542                         if (use_this_rncptr->Attached == 1)
543                         {
544                             DeleteRssCfg(rncptr);
545                         }
546                         else 
547                         {
548                             /* TODO: hook us into the otherone here. */
549                         }
550
551                         continue;
552                     }
553
554                     rncptr->ItemType = RSS_UNSET;
555                                 
556                     rncptr->rooms = NULL;
557                     rncptr->rooms = strdup(qrbuf->QRname);
558
559                     citthread_mutex_lock(&RSSQueueMutex);
560                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
561                     citthread_mutex_unlock(&RSSQueueMutex);
562                 }
563             }
564         }
565         if (Count != NULL)
566         {
567                 Count->QRnumber = qrbuf->QRnumber;
568                 citthread_mutex_lock(&RSSQueueMutex);
569                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
570                 citthread_mutex_unlock(&RSSQueueMutex);
571         }
572         FreeStrBuf(&CfgData);
573         FreeStrBuf(&CfgType);
574         FreeStrBuf(&Line);
575 }
576
577 /*
578  * Scan for rooms that have RSS client requests configured
579  */
580 void rssclient_scan(void) {
581         static int doing_rssclient = 0;
582         rssnetcfg *rptr = NULL;
583         void *vrptr = NULL;
584         HashPos  *it;
585         long len;
586         const char *Key;
587
588         /*
589          * This is a simple concurrency check to make sure only one rssclient run
590          * is done at a time.  We could do this with a mutex, but since we
591          * don't really require extremely fine granularity here, we'll do it
592          * with a static variable instead.
593          */
594         if (doing_rssclient) return;
595         doing_rssclient = 1;
596
597         CtdlLogPrintf(CTDL_DEBUG, "rssclient started\n");
598         CtdlForEachRoom(rssclient_scan_room, NULL);
599
600         citthread_mutex_lock(&RSSQueueMutex);
601
602         it = GetNewHashPos(RSSQueueRooms, 0);
603         while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
604                (vrptr != NULL)) {
605                 rptr = (rssnetcfg *)vrptr;
606                 if (!rptr->Attached) rss_do_fetching(rptr);
607         }
608         DeleteHashPos(&it);
609         citthread_mutex_unlock(&RSSQueueMutex);
610
611         CtdlLogPrintf(CTDL_DEBUG, "rssclientscheduler ended\n");
612         doing_rssclient = 0;
613         return;
614 }
615
616 void RSSCleanup(void)
617 {
618         citthread_mutex_destroy(&RSSQueueMutex);
619         DeleteHash(&RSSFetchUrls);
620         DeleteHash(&RSSQueueRooms);
621 }
622
623
624 CTDL_MODULE_INIT(rssclient)
625 {
626         if (threading)
627         {
628                 citthread_mutex_init(&RSSQueueMutex, NULL);
629                 RSSQueueRooms = NewHash(1, Flathash);
630                 RSSFetchUrls = NewHash(1, NULL);
631                 CtdlLogPrintf(CTDL_INFO, "%s\n", curl_version());
632                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
633         }
634         return "rssclient";
635 }