68f04ab75daddf83fd0fef6c04c3b4f82cf3ae44
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71
72
73 void DeleteRoomReference(long QRnumber)
74 {
75         HashPos *At;
76         long HKLen;
77         const char *HK;
78         void *vData = NULL;
79         rss_room_counter *pRoomC;
80
81         At = GetNewHashPos(RSSQueueRooms, 0);
82
83         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
84         {
85                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
86                 if (vData != NULL)
87                 {
88                         pRoomC = (rss_room_counter *) vData;
89                         pRoomC->count --;
90                         if (pRoomC->count == 0)
91                                 DeleteEntryFromHash(RSSQueueRooms, At);
92                 }
93         }
94         DeleteHashPos(&At);
95 }
96
97 void UnlinkRooms(rss_aggregator *Cfg)
98 {
99         DeleteRoomReference(Cfg->QRnumber);
100         if (Cfg->OtherQRnumbers != NULL)
101         {
102                 long HKLen;
103                 const char *HK;
104                 HashPos *At;
105                 void *vData;
106
107                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
108                 while (! server_shutting_down &&
109                        GetNextHashPos(Cfg->OtherQRnumbers,
110                                       At,
111                                       &HKLen, &HK,
112                                       &vData) &&
113                        (vData != NULL))
114                 {
115                         long *lData = (long*) vData;
116                         DeleteRoomReference(*lData);
117                 }
118
119                 DeleteHashPos(&At);
120         }
121 }
122
123 void UnlinkRSSAggregator(rss_aggregator *Cfg)
124 {
125         HashPos *At;
126
127         pthread_mutex_lock(&RSSQueueMutex);
128         UnlinkRooms(Cfg);
129
130         At = GetNewHashPos(RSSFetchUrls, 0);
131         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
132         {
133                 DeleteEntryFromHash(RSSFetchUrls, At);
134         }
135         DeleteHashPos(&At);
136         last_run = time(NULL);
137         pthread_mutex_unlock(&RSSQueueMutex);
138 }
139
140 void DeleteRssCfg(void *vptr)
141 {
142         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
143         AsyncIO *IO = &RSSAggr->IO;
144         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
145
146         FreeStrBuf(&RSSAggr->Url);
147         FreeStrBuf(&RSSAggr->rooms);
148         FreeStrBuf(&RSSAggr->CData);
149         FreeStrBuf(&RSSAggr->Key);
150         DeleteHash(&RSSAggr->OtherQRnumbers);
151
152         DeleteHashPos (&RSSAggr->Pos);
153         DeleteHash (&RSSAggr->Messages);
154         if (RSSAggr->recp.recp_room != NULL)
155                 free(RSSAggr->recp.recp_room);
156
157
158         if (RSSAggr->Item != NULL)
159         {
160                 flush_rss_item(RSSAggr->Item);
161
162                 free(RSSAggr->Item);
163         }
164
165         FreeAsyncIOContents(&RSSAggr->IO);
166         free(RSSAggr);
167 }
168
169 eNextState RSSAggregator_Terminate(AsyncIO *IO)
170 {
171         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
172
173         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
174
175
176         UnlinkRSSAggregator(RSSAggr);
177         return eAbort;
178 }
179
180 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
181 {
182         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
183
184         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
185
186
187         UnlinkRSSAggregator(RSSAggr);
188         return eAbort;
189 }
190
191 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
192 {
193         const char *pUrl;
194         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
195
196         pUrl = IO->ConnectMe->PlainUrl;
197         if (pUrl == NULL)
198                 pUrl = "";
199
200         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
201
202
203         UnlinkRSSAggregator(RSSAggr);
204         return eAbort;
205 }
206
207
208 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
209 {
210         return eAbort; ///TODO
211 }
212
213 eNextState RSSSaveMessage(AsyncIO *IO)
214 {
215         long len;
216         const char *Key;
217         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
218
219         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
220                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
221
222         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
223
224         /* write the uidl to the use table so we don't store this item again */
225         cdb_store(CDB_USETABLE,
226                   SKEY(RSSAggr->ThisMsg->MsgGUID),
227                   &RSSAggr->ThisMsg->ut,
228                   sizeof(struct UseTable) );
229
230         if (GetNextHashPos(RSSAggr->Messages,
231                            RSSAggr->Pos,
232                            &len, &Key,
233                            (void**) &RSSAggr->ThisMsg))
234                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
235         else
236                 return eAbort;
237 }
238
239 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
240 {
241         const char *Key;
242         long len;
243         struct cdbdata *cdbut;
244         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
245
246         /* Find out if we've already seen this item */
247         strcpy(Ctx->ThisMsg->ut.ut_msgid,
248                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
249         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
250
251         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
252 #ifndef DEBUG_RSS
253         if (cdbut != NULL) {
254                 /* Item has already been seen */
255                 EV_syslog(LOG_DEBUG,
256                           "%s has already been seen\n",
257                           ChrPtr(Ctx->ThisMsg->MsgGUID));
258                 cdb_free(cdbut);
259
260                 /* rewrite the record anyway, to update the timestamp */
261                 cdb_store(CDB_USETABLE,
262                           SKEY(Ctx->ThisMsg->MsgGUID),
263                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
264
265                 if (GetNextHashPos(Ctx->Messages,
266                                    Ctx->Pos,
267                                    &len, &Key,
268                                    (void**) &Ctx->ThisMsg))
269                         return NextDBOperation(
270                                 IO,
271                                 RSS_FetchNetworkUsetableEntry);
272                 else
273                         return eAbort;
274         }
275         else
276 #endif
277         {
278                 NextDBOperation(IO, RSSSaveMessage);
279                 return eSendMore;
280         }
281 }
282
283 /*
284  * Begin a feed parse
285  */
286 int rss_do_fetching(rss_aggregator *Cfg)
287 {
288         rss_item *ri;
289         time_t now;
290
291         now = time(NULL);
292
293         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
294                 return 0;
295
296         ri = (rss_item*) malloc(sizeof(rss_item));
297         memset(ri, 0, sizeof(rss_item));
298         Cfg->Item = ri;
299
300         if (! InitcURLIOStruct(&Cfg->IO,
301                                Cfg,
302                                "Citadel RSS Client",
303                                RSSAggregator_ParseReply,
304                                RSSAggregator_Terminate,
305                                RSSAggregator_TerminateDB,
306                                RSSAggregator_ShutdownAbort))
307         {
308                 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
309                 return 0;
310         }
311
312         safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
313                     ChrPtr(Cfg->Url),
314                     sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
315
316         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
317         ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
318         CurlPrepareURL(Cfg->IO.ConnectMe);
319
320         QueueCurlContext(&Cfg->IO);
321         return 1;
322 }
323
324 /*
325  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
326  */
327 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
328 {
329         StrBuf *CfgData=NULL;
330         StrBuf *CfgType;
331         StrBuf *Line;
332         rss_room_counter *Count = NULL;
333         struct stat statbuf;
334         char filename[PATH_MAX];
335         int fd;
336         int Done;
337         rss_aggregator *RSSAggr = NULL;
338         rss_aggregator *use_this_RSSAggr = NULL;
339         void *vptr;
340         const char *CfgPtr, *lPtr;
341         const char *Err;
342
343         pthread_mutex_lock(&RSSQueueMutex);
344         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
345         {
346                 syslog(LOG_DEBUG,
347                        "rssclient: [%ld] %s already in progress.\n",
348                        qrbuf->QRnumber,
349                        qrbuf->QRname);
350                 pthread_mutex_unlock(&RSSQueueMutex);
351                 return;
352         }
353         pthread_mutex_unlock(&RSSQueueMutex);
354
355         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
356
357         if (server_shutting_down)
358                 return;
359
360         /* Only do net processing for rooms that have netconfigs */
361         fd = open(filename, 0);
362         if (fd <= 0) {
363                 /* syslog(LOG_DEBUG,
364                    "rssclient: %s no config.\n",
365                    qrbuf->QRname); */
366                 return;
367         }
368
369         if (server_shutting_down)
370                 return;
371
372         if (fstat(fd, &statbuf) == -1) {
373                 syslog(LOG_DEBUG,
374                        "ERROR: could not stat configfile '%s' - %s\n",
375                        filename,
376                        strerror(errno));
377                 return;
378         }
379
380         if (server_shutting_down)
381                 return;
382
383         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
384
385         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
386                 close(fd);
387                 FreeStrBuf(&CfgData);
388                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
389                        filename, strerror(errno));
390                 return;
391         }
392         close(fd);
393         if (server_shutting_down)
394                 return;
395
396         CfgPtr = NULL;
397         CfgType = NewStrBuf();
398         Line = NewStrBufPlain(NULL, StrLength(CfgData));
399         Done = 0;
400         while (!Done)
401         {
402                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
403                 if (StrLength(Line) > 0)
404                 {
405                         lPtr = NULL;
406                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
407                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
408                         {
409                                 if (Count == NULL)
410                                 {
411                                         Count = malloc(
412                                                 sizeof(rss_room_counter));
413                                         Count->count = 0;
414                                 }
415                                 Count->count ++;
416                                 RSSAggr = (rss_aggregator *) malloc(
417                                         sizeof(rss_aggregator));
418
419                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
420                                 RSSAggr->QRnumber = qrbuf->QRnumber;
421                                 RSSAggr->roomlist_parts = 1;
422                                 RSSAggr->Url = NewStrBuf();
423
424                                 StrBufExtract_NextToken(RSSAggr->Url,
425                                                         Line,
426                                                         &lPtr,
427                                                         '|');
428
429                                 pthread_mutex_lock(&RSSQueueMutex);
430                                 GetHash(RSSFetchUrls,
431                                         SKEY(RSSAggr->Url),
432                                         &vptr);
433
434                                 use_this_RSSAggr = (rss_aggregator *)vptr;
435                                 if (use_this_RSSAggr != NULL)
436                                 {
437                                         long *QRnumber;
438                                         StrBufAppendBufPlain(
439                                                 use_this_RSSAggr->rooms,
440                                                 qrbuf->QRname,
441                                                 -1, 0);
442                                         if (use_this_RSSAggr->roomlist_parts==1)
443                                         {
444                                                 use_this_RSSAggr->OtherQRnumbers
445                                                         = NewHash(1, lFlathash);
446                                         }
447                                         QRnumber = (long*)malloc(sizeof(long));
448                                         *QRnumber = qrbuf->QRnumber;
449                                         Put(use_this_RSSAggr->OtherQRnumbers,
450                                             LKEY(qrbuf->QRnumber),
451                                             QRnumber,
452                                             NULL);
453                                         use_this_RSSAggr->roomlist_parts++;
454
455                                         pthread_mutex_unlock(&RSSQueueMutex);
456
457                                         FreeStrBuf(&RSSAggr->Url);
458                                         free(RSSAggr);
459                                         RSSAggr = NULL;
460                                         continue;
461                                 }
462                                 pthread_mutex_unlock(&RSSQueueMutex);
463
464                                 RSSAggr->ItemType = RSS_UNSET;
465
466                                 RSSAggr->rooms = NewStrBufPlain(
467                                         qrbuf->QRname, -1);
468
469                                 pthread_mutex_lock(&RSSQueueMutex);
470
471                                 Put(RSSFetchUrls,
472                                     SKEY(RSSAggr->Url),
473                                     RSSAggr,
474                                     DeleteRssCfg);
475
476                                 pthread_mutex_unlock(&RSSQueueMutex);
477                         }
478                 }
479         }
480         if (Count != NULL)
481         {
482                 Count->QRnumber = qrbuf->QRnumber;
483                 pthread_mutex_lock(&RSSQueueMutex);
484                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
485                        qrbuf->QRnumber, qrbuf->QRname);
486                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
487                 pthread_mutex_unlock(&RSSQueueMutex);
488         }
489         FreeStrBuf(&CfgData);
490         FreeStrBuf(&CfgType);
491         FreeStrBuf(&Line);
492 }
493
494 /*
495  * Scan for rooms that have RSS client requests configured
496  */
497 void rssclient_scan(void) {
498         int RSSRoomCount, RSSCount;
499         rss_aggregator *rptr = NULL;
500         void *vrptr = NULL;
501         HashPos *it;
502         long len;
503         const char *Key;
504         time_t now = time(NULL);
505
506         /* Run no more than once every 15 minutes. */
507         if ((now - last_run) < 900) {
508                 syslog(LOG_DEBUG,
509                         "rssclient: polling interval not yet reached; last run was %ldm%lds ago",
510                         ((now - last_run) / 60),
511                         ((now - last_run) % 60)
512                 );
513                 return;
514         }
515
516         /*
517          * This is a simple concurrency check to make sure only one rssclient
518          * run is done at a time.
519          */
520         pthread_mutex_lock(&RSSQueueMutex);
521         RSSCount = GetCount(RSSFetchUrls);
522         RSSRoomCount = GetCount(RSSQueueRooms);
523         pthread_mutex_unlock(&RSSQueueMutex);
524
525         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
526                 syslog(LOG_DEBUG,
527                        "rssclient: concurrency check failed; %d rooms and %d url's are queued",
528                        RSSRoomCount, RSSCount
529                         );
530                 return;
531         }
532
533         become_session(&rss_CC);
534         syslog(LOG_DEBUG, "rssclient started\n");
535         CtdlForEachRoom(rssclient_scan_room, NULL);
536
537         pthread_mutex_lock(&RSSQueueMutex);
538
539         it = GetNewHashPos(RSSFetchUrls, 0);
540         while (!server_shutting_down &&
541                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
542                (vrptr != NULL)) {
543                 rptr = (rss_aggregator *)vrptr;
544                 if (!rss_do_fetching(rptr))
545                         UnlinkRSSAggregator(rptr);
546         }
547         DeleteHashPos(&it);
548         pthread_mutex_unlock(&RSSQueueMutex);
549
550         syslog(LOG_DEBUG, "rssclient ended\n");
551         return;
552 }
553
554 void rss_cleanup(void)
555 {
556         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
557         DeleteHash(&RSSFetchUrls);
558         DeleteHash(&RSSQueueRooms);
559 }
560
561
562 CTDL_MODULE_INIT(rssclient)
563 {
564         if (threading)
565         {
566                 CtdlFillSystemContext(&rss_CC, "rssclient");
567                 pthread_mutex_init(&RSSQueueMutex, NULL);
568                 RSSQueueRooms = NewHash(1, lFlathash);
569                 RSSFetchUrls = NewHash(1, NULL);
570                 syslog(LOG_INFO, "%s\n", curl_version());
571                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
572                 CtdlRegisterEVCleanupHook(rss_cleanup);
573         }
574         return "rssclient";
575 }