bbf3c0c78431c2693773fef2f620453ce76d328b
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71
72
73 void DeleteRoomReference(long QRnumber)
74 {
75         HashPos *At;
76         long HKLen;
77         const char *HK;
78         void *vData = NULL;
79         rss_room_counter *pRoomC;
80
81         At = GetNewHashPos(RSSQueueRooms, 0);
82
83         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
84         {
85                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
86                 if (vData != NULL)
87                 {
88                         pRoomC = (rss_room_counter *) vData;
89                         pRoomC->count --;
90                         if (pRoomC->count == 0)
91                                 DeleteEntryFromHash(RSSQueueRooms, At);
92                 }
93         }
94         DeleteHashPos(&At);
95 }
96
97 void UnlinkRooms(rss_aggregator *Cfg)
98 {
99         DeleteRoomReference(Cfg->QRnumber);
100         if (Cfg->OtherQRnumbers != NULL)
101         {
102                 long HKLen;
103                 const char *HK;
104                 HashPos *At;
105                 void *vData;
106
107                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
108                 while (! server_shutting_down &&
109                        GetNextHashPos(Cfg->OtherQRnumbers,
110                                       At,
111                                       &HKLen, &HK,
112                                       &vData) &&
113                        (vData != NULL))
114                 {
115                         long *lData = (long*) vData;
116                         DeleteRoomReference(*lData);
117                 }
118
119                 DeleteHashPos(&At);
120         }
121 }
122
123 void UnlinkRSSAggregator(rss_aggregator *Cfg)
124 {
125         HashPos *At;
126
127         UnlinkRooms(Cfg);
128
129         At = GetNewHashPos(RSSFetchUrls, 0);
130         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
131         {
132                 DeleteEntryFromHash(RSSFetchUrls, At);
133         }
134         DeleteHashPos(&At);
135         last_run = time(NULL);
136 }
137
138 void DeleteRssCfg(void *vptr)
139 {
140         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
141         AsyncIO *IO = &RSSAggr->IO;
142         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
143
144         FreeStrBuf(&RSSAggr->Url);
145         FreeStrBuf(&RSSAggr->rooms);
146         FreeStrBuf(&RSSAggr->CData);
147         FreeStrBuf(&RSSAggr->Key);
148         DeleteHash(&RSSAggr->OtherQRnumbers);
149
150         DeleteHashPos (&RSSAggr->Pos);
151         DeleteHash (&RSSAggr->Messages);
152         if (RSSAggr->recp.recp_room != NULL)
153                 free(RSSAggr->recp.recp_room);
154
155
156         if (RSSAggr->Item != NULL)
157         {
158                 flush_rss_item(RSSAggr->Item);
159
160                 free(RSSAggr->Item);
161         }
162
163         FreeAsyncIOContents(&RSSAggr->IO);
164         free(RSSAggr);
165 }
166
167 eNextState RSSAggregator_Terminate(AsyncIO *IO)
168 {
169         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
170
171         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
172
173
174         UnlinkRSSAggregator(RSSAggr);
175         return eAbort;
176 }
177
178 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
179 {
180         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
181
182         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
183
184
185         UnlinkRSSAggregator(RSSAggr);
186         return eAbort;
187 }
188
189 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
190 {
191         const char *pUrl;
192         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
193
194         pUrl = IO->ConnectMe->PlainUrl;
195         if (pUrl == NULL)
196                 pUrl = "";
197
198         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
199
200
201         UnlinkRSSAggregator(RSSAggr);
202         return eAbort;
203 }
204
205
206 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
207 {
208         return eAbort; ///TODO
209 }
210
211 eNextState RSSSaveMessage(AsyncIO *IO)
212 {
213         long len;
214         const char *Key;
215         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
216
217         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
218                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
219
220         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
221
222         /* write the uidl to the use table so we don't store this item again */
223         cdb_store(CDB_USETABLE,
224                   SKEY(RSSAggr->ThisMsg->MsgGUID),
225                   &RSSAggr->ThisMsg->ut,
226                   sizeof(struct UseTable) );
227
228         if (GetNextHashPos(RSSAggr->Messages,
229                            RSSAggr->Pos,
230                            &len, &Key,
231                            (void**) &RSSAggr->ThisMsg))
232                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
233         else
234                 return eAbort;
235 }
236
237 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
238 {
239         const char *Key;
240         long len;
241         struct cdbdata *cdbut;
242         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
243
244         /* Find out if we've already seen this item */
245         strcpy(Ctx->ThisMsg->ut.ut_msgid,
246                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
247         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
248
249         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
250 #ifndef DEBUG_RSS
251         if (cdbut != NULL) {
252                 /* Item has already been seen */
253                 EV_syslog(LOG_DEBUG,
254                           "%s has already been seen\n",
255                           ChrPtr(Ctx->ThisMsg->MsgGUID));
256                 cdb_free(cdbut);
257
258                 /* rewrite the record anyway, to update the timestamp */
259                 cdb_store(CDB_USETABLE,
260                           SKEY(Ctx->ThisMsg->MsgGUID),
261                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
262
263                 if (GetNextHashPos(Ctx->Messages,
264                                    Ctx->Pos,
265                                    &len, &Key,
266                                    (void**) &Ctx->ThisMsg))
267                         return NextDBOperation(
268                                 IO,
269                                 RSS_FetchNetworkUsetableEntry);
270                 else
271                         return eAbort;
272         }
273         else
274 #endif
275         {
276                 NextDBOperation(IO, RSSSaveMessage);
277                 return eSendMore;
278         }
279 }
280
281 /*
282  * Begin a feed parse
283  */
284 int rss_do_fetching(rss_aggregator *Cfg)
285 {
286         rss_item *ri;
287         time_t now;
288
289         now = time(NULL);
290
291         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
292                 return 0;
293
294         ri = (rss_item*) malloc(sizeof(rss_item));
295         memset(ri, 0, sizeof(rss_item));
296         Cfg->Item = ri;
297
298         if (! InitcURLIOStruct(&Cfg->IO,
299                                Cfg,
300                                "Citadel RSS Client",
301                                RSSAggregator_ParseReply,
302                                RSSAggregator_Terminate,
303                                RSSAggregator_TerminateDB,
304                                RSSAggregator_ShutdownAbort))
305         {
306                 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
307                 return 0;
308         }
309
310         safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
311                     ChrPtr(Cfg->Url),
312                     sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
313
314         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
315         ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
316         CurlPrepareURL(Cfg->IO.ConnectMe);
317
318         QueueCurlContext(&Cfg->IO);
319         return 1;
320 }
321
322 /*
323  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
324  */
325 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
326 {
327         StrBuf *CfgData=NULL;
328         StrBuf *CfgType;
329         StrBuf *Line;
330         rss_room_counter *Count = NULL;
331         struct stat statbuf;
332         char filename[PATH_MAX];
333         int fd;
334         int Done;
335         rss_aggregator *RSSAggr = NULL;
336         rss_aggregator *use_this_RSSAggr = NULL;
337         void *vptr;
338         const char *CfgPtr, *lPtr;
339         const char *Err;
340
341         pthread_mutex_lock(&RSSQueueMutex);
342         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
343         {
344                 syslog(LOG_DEBUG,
345                        "rssclient: [%ld] %s already in progress.\n",
346                        qrbuf->QRnumber,
347                        qrbuf->QRname);
348                 pthread_mutex_unlock(&RSSQueueMutex);
349                 return;
350         }
351         pthread_mutex_unlock(&RSSQueueMutex);
352
353         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
354
355         if (server_shutting_down)
356                 return;
357
358         /* Only do net processing for rooms that have netconfigs */
359         fd = open(filename, 0);
360         if (fd <= 0) {
361                 /* syslog(LOG_DEBUG,
362                    "rssclient: %s no config.\n",
363                    qrbuf->QRname); */
364                 return;
365         }
366
367         if (server_shutting_down)
368                 return;
369
370         if (fstat(fd, &statbuf) == -1) {
371                 syslog(LOG_DEBUG,
372                        "ERROR: could not stat configfile '%s' - %s\n",
373                        filename,
374                        strerror(errno));
375                 return;
376         }
377
378         if (server_shutting_down)
379                 return;
380
381         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
382
383         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
384                 close(fd);
385                 FreeStrBuf(&CfgData);
386                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
387                        filename, strerror(errno));
388                 return;
389         }
390         close(fd);
391         if (server_shutting_down)
392                 return;
393
394         CfgPtr = NULL;
395         CfgType = NewStrBuf();
396         Line = NewStrBufPlain(NULL, StrLength(CfgData));
397         Done = 0;
398         while (!Done)
399         {
400                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
401                 if (StrLength(Line) > 0)
402                 {
403                         lPtr = NULL;
404                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
405                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
406                         {
407                                 if (Count == NULL)
408                                 {
409                                         Count = malloc(
410                                                 sizeof(rss_room_counter));
411                                         Count->count = 0;
412                                 }
413                                 Count->count ++;
414                                 RSSAggr = (rss_aggregator *) malloc(
415                                         sizeof(rss_aggregator));
416
417                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
418                                 RSSAggr->QRnumber = qrbuf->QRnumber;
419                                 RSSAggr->roomlist_parts = 1;
420                                 RSSAggr->Url = NewStrBuf();
421
422                                 StrBufExtract_NextToken(RSSAggr->Url,
423                                                         Line,
424                                                         &lPtr,
425                                                         '|');
426
427                                 pthread_mutex_lock(&RSSQueueMutex);
428                                 GetHash(RSSFetchUrls,
429                                         SKEY(RSSAggr->Url),
430                                         &vptr);
431
432                                 use_this_RSSAggr = (rss_aggregator *)vptr;
433                                 if (use_this_RSSAggr != NULL)
434                                 {
435                                         long *QRnumber;
436                                         StrBufAppendBufPlain(
437                                                 use_this_RSSAggr->rooms,
438                                                 qrbuf->QRname,
439                                                 -1, 0);
440                                         if (use_this_RSSAggr->roomlist_parts==1)
441                                         {
442                                                 use_this_RSSAggr->OtherQRnumbers
443                                                         = NewHash(1, lFlathash);
444                                         }
445                                         QRnumber = (long*)malloc(sizeof(long));
446                                         *QRnumber = qrbuf->QRnumber;
447                                         Put(use_this_RSSAggr->OtherQRnumbers,
448                                             LKEY(qrbuf->QRnumber),
449                                             QRnumber,
450                                             NULL);
451                                         use_this_RSSAggr->roomlist_parts++;
452
453                                         pthread_mutex_unlock(&RSSQueueMutex);
454
455                                         FreeStrBuf(&RSSAggr->Url);
456                                         free(RSSAggr);
457                                         RSSAggr = NULL;
458                                         continue;
459                                 }
460                                 pthread_mutex_unlock(&RSSQueueMutex);
461
462                                 RSSAggr->ItemType = RSS_UNSET;
463
464                                 RSSAggr->rooms = NewStrBufPlain(
465                                         qrbuf->QRname, -1);
466
467                                 pthread_mutex_lock(&RSSQueueMutex);
468
469                                 Put(RSSFetchUrls,
470                                     SKEY(RSSAggr->Url),
471                                     RSSAggr,
472                                     DeleteRssCfg);
473
474                                 pthread_mutex_unlock(&RSSQueueMutex);
475                         }
476                 }
477         }
478         if (Count != NULL)
479         {
480                 Count->QRnumber = qrbuf->QRnumber;
481                 pthread_mutex_lock(&RSSQueueMutex);
482                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
483                        qrbuf->QRnumber, qrbuf->QRname);
484                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
485                 pthread_mutex_unlock(&RSSQueueMutex);
486         }
487         FreeStrBuf(&CfgData);
488         FreeStrBuf(&CfgType);
489         FreeStrBuf(&Line);
490 }
491
492 /*
493  * Scan for rooms that have RSS client requests configured
494  */
495 void rssclient_scan(void) {
496         rss_aggregator *rptr = NULL;
497         void *vrptr = NULL;
498         HashPos *it;
499         long len;
500         const char *Key;
501         time_t now = time(NULL);
502
503         /* Run no more than once every 15 minutes. */
504         if ((now - last_run) < 900) {
505                 syslog(LOG_DEBUG,
506                         "rssclient: polling interval not yet reached; last run was %ldm%lds ago",
507                         ((now - last_run) / 60),
508                         ((now - last_run) % 60)
509                 );
510                 return;
511         }
512
513         /*
514          * This is a simple concurrency check to make sure only one rssclient
515          * run is done at a time.We could do this with a mutex, but since we
516          * don't really require extremely fine granularity here, we'll do it
517          * with a static variable instead.
518          */
519
520         if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0)) {
521                 syslog(LOG_DEBUG,
522                         "rssclient: concurrency check failed; %d rooms and %d url's are queued",
523                         GetCount(RSSQueueRooms),
524                         GetCount(RSSFetchUrls)
525                 );
526                 return;
527         }
528
529         become_session(&rss_CC);
530         syslog(LOG_DEBUG, "rssclient started\n");
531         CtdlForEachRoom(rssclient_scan_room, NULL);
532
533         pthread_mutex_lock(&RSSQueueMutex);
534
535         it = GetNewHashPos(RSSFetchUrls, 0);
536         while (!server_shutting_down &&
537                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
538                (vrptr != NULL)) {
539                 rptr = (rss_aggregator *)vrptr;
540                 if (!rss_do_fetching(rptr))
541                         UnlinkRSSAggregator(rptr);
542         }
543         DeleteHashPos(&it);
544         pthread_mutex_unlock(&RSSQueueMutex);
545
546         syslog(LOG_DEBUG, "rssclient ended\n");
547         return;
548 }
549
550 void rss_cleanup(void)
551 {
552         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
553         DeleteHash(&RSSFetchUrls);
554         DeleteHash(&RSSQueueRooms);
555 }
556
557
558 CTDL_MODULE_INIT(rssclient)
559 {
560         if (threading)
561         {
562                 CtdlFillSystemContext(&rss_CC, "rssclient");
563                 pthread_mutex_init(&RSSQueueMutex, NULL);
564                 RSSQueueRooms = NewHash(1, lFlathash);
565                 RSSFetchUrls = NewHash(1, NULL);
566                 syslog(LOG_INFO, "%s\n", curl_version());
567                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
568                 CtdlRegisterEVCleanupHook(rss_cleanup);
569         }
570         return "rssclient";
571 }