f6561e6b1fd1e1a13c0ab6130c76c41376031ebd
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,           \
94                              IO->ID, N)
95
96 void DeleteRoomReference(long QRnumber)
97 {
98         HashPos *At;
99         long HKLen;
100         const char *HK;
101         void *vData = NULL;
102         rss_room_counter *pRoomC;
103
104         At = GetNewHashPos(RSSQueueRooms, 0);
105
106         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
107         {
108                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
109                 if (vData != NULL)
110                 {
111                         pRoomC = (rss_room_counter *) vData;
112                         pRoomC->count --;
113                         if (pRoomC->count == 0)
114                                 DeleteEntryFromHash(RSSQueueRooms, At);
115                 }
116         }
117         DeleteHashPos(&At);
118 }
119
120 void UnlinkRooms(rss_aggregator *RSSAggr)
121 {
122         DeleteRoomReference(RSSAggr->QRnumber);
123         if (RSSAggr->OtherQRnumbers != NULL)
124         {
125                 long HKLen;
126                 const char *HK;
127                 HashPos *At;
128                 void *vData;
129
130                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
131                 while (! server_shutting_down &&
132                        GetNextHashPos(RSSAggr->OtherQRnumbers,
133                                       At,
134                                       &HKLen, &HK,
135                                       &vData) &&
136                        (vData != NULL))
137                 {
138                         long *lData = (long*) vData;
139                         DeleteRoomReference(*lData);
140                 }
141
142                 DeleteHashPos(&At);
143         }
144 }
145
146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
147 {
148         HashPos *At;
149
150         pthread_mutex_lock(&RSSQueueMutex);
151         UnlinkRooms(RSSAggr);
152
153         At = GetNewHashPos(RSSFetchUrls, 0);
154         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
155         {
156                 DeleteEntryFromHash(RSSFetchUrls, At);
157         }
158         DeleteHashPos(&At);
159         last_run = time(NULL);
160         pthread_mutex_unlock(&RSSQueueMutex);
161 }
162
163 void DeleteRssCfg(void *vptr)
164 {
165         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
166         AsyncIO *IO = &RSSAggr->IO;
167         EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
168
169         FreeStrBuf(&RSSAggr->Url);
170         FreeStrBuf(&RSSAggr->rooms);
171         FreeStrBuf(&RSSAggr->CData);
172         FreeStrBuf(&RSSAggr->Key);
173         DeleteHash(&RSSAggr->OtherQRnumbers);
174
175         DeleteHashPos (&RSSAggr->Pos);
176         DeleteHash (&RSSAggr->Messages);
177         if (RSSAggr->recp.recp_room != NULL)
178                 free(RSSAggr->recp.recp_room);
179
180
181         if (RSSAggr->Item != NULL)
182         {
183                 flush_rss_item(RSSAggr->Item);
184
185                 free(RSSAggr->Item);
186         }
187
188         FreeAsyncIOContents(&RSSAggr->IO);
189         free(RSSAggr);
190 }
191
192 eNextState RSSAggregator_Terminate(AsyncIO *IO)
193 {
194         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
195
196         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
197
198
199         UnlinkRSSAggregator(RSSAggr);
200         return eAbort;
201 }
202
203 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
204 {
205         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
206
207         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
208
209
210         UnlinkRSSAggregator(RSSAggr);
211         return eAbort;
212 }
213
214 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
215 {
216         const char *pUrl;
217         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
218
219         pUrl = IO->ConnectMe->PlainUrl;
220         if (pUrl == NULL)
221                 pUrl = "";
222
223         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
224
225
226         UnlinkRSSAggregator(RSSAggr);
227         return eAbort;
228 }
229
230
231 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
232 {
233         return eAbort; ///TODO
234 }
235
236 eNextState RSSSaveMessage(AsyncIO *IO)
237 {
238         long len;
239         const char *Key;
240         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
241
242         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
243                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
244
245         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
246
247         /* write the uidl to the use table so we don't store this item again */
248         cdb_store(CDB_USETABLE,
249                   SKEY(RSSAggr->ThisMsg->MsgGUID),
250                   &RSSAggr->ThisMsg->ut,
251                   sizeof(struct UseTable) );
252
253         if (GetNextHashPos(RSSAggr->Messages,
254                            RSSAggr->Pos,
255                            &len, &Key,
256                            (void**) &RSSAggr->ThisMsg))
257                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
258         else
259                 return eAbort;
260 }
261
262 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
263 {
264         const char *Key;
265         long len;
266         struct cdbdata *cdbut;
267         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
268
269         /* Find out if we've already seen this item */
270         strcpy(Ctx->ThisMsg->ut.ut_msgid,
271                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
272         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
273
274         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
275 #ifndef DEBUG_RSS
276         if (cdbut != NULL) {
277                 /* Item has already been seen */
278                 EVRSSC_syslog(LOG_DEBUG,
279                           "%s has already been seen\n",
280                           ChrPtr(Ctx->ThisMsg->MsgGUID));
281                 cdb_free(cdbut);
282
283                 /* rewrite the record anyway, to update the timestamp */
284                 cdb_store(CDB_USETABLE,
285                           SKEY(Ctx->ThisMsg->MsgGUID),
286                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
287
288                 if (GetNextHashPos(Ctx->Messages,
289                                    Ctx->Pos,
290                                    &len, &Key,
291                                    (void**) &Ctx->ThisMsg))
292                         return NextDBOperation(
293                                 IO,
294                                 RSS_FetchNetworkUsetableEntry);
295                 else
296                         return eAbort;
297         }
298         else
299 #endif
300         {
301                 NextDBOperation(IO, RSSSaveMessage);
302                 return eSendMore;
303         }
304 }
305
306 /*
307  * Begin a feed parse
308  */
309 int rss_do_fetching(rss_aggregator *RSSAggr)
310 {
311         AsyncIO         *IO = &RSSAggr->IO;
312         rss_item *ri;
313         time_t now;
314
315         now = time(NULL);
316
317         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
318                 return 0;
319
320         ri = (rss_item*) malloc(sizeof(rss_item));
321         memset(ri, 0, sizeof(rss_item));
322         RSSAggr->Item = ri;
323
324         if (! InitcURLIOStruct(&RSSAggr->IO,
325                                RSSAggr,
326                                "Citadel RSS Client",
327                                RSSAggregator_ParseReply,
328                                RSSAggregator_Terminate,
329                                RSSAggregator_TerminateDB,
330                                RSSAggregator_ShutdownAbort))
331         {
332                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
333                 return 0;
334         }
335
336         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
337                     ChrPtr(RSSAggr->Url),
338                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
339
340         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
341         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
342         CurlPrepareURL(RSSAggr->IO.ConnectMe);
343
344         QueueCurlContext(&RSSAggr->IO);
345         return 1;
346 }
347
348 /*
349  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
350  */
351 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
352 {
353         StrBuf *CfgData=NULL;
354         StrBuf *CfgType;
355         StrBuf *Line;
356         rss_room_counter *Count = NULL;
357         struct stat statbuf;
358         char filename[PATH_MAX];
359         int fd;
360         int Done;
361         rss_aggregator *RSSAggr = NULL;
362         rss_aggregator *use_this_RSSAggr = NULL;
363         void *vptr;
364         const char *CfgPtr, *lPtr;
365         const char *Err;
366
367         pthread_mutex_lock(&RSSQueueMutex);
368         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
369         {
370                 EVRSSQ_syslog(LOG_DEBUG,
371                               "rssclient: [%ld] %s already in progress.\n",
372                               qrbuf->QRnumber,
373                               qrbuf->QRname);
374                 pthread_mutex_unlock(&RSSQueueMutex);
375                 return;
376         }
377         pthread_mutex_unlock(&RSSQueueMutex);
378
379         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
380
381         if (server_shutting_down)
382                 return;
383
384         /* Only do net processing for rooms that have netconfigs */
385         fd = open(filename, 0);
386         if (fd <= 0) {
387                 /* syslog(LOG_DEBUG,
388                    "rssclient: %s no config.\n",
389                    qrbuf->QRname); */
390                 return;
391         }
392
393         if (server_shutting_down)
394                 return;
395
396         if (fstat(fd, &statbuf) == -1) {
397                 EVRSSQ_syslog(LOG_DEBUG,
398                               "ERROR: could not stat configfile '%s' - %s\n",
399                               filename,
400                               strerror(errno));
401                 return;
402         }
403
404         if (server_shutting_down)
405                 return;
406
407         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
408
409         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
410                 close(fd);
411                 FreeStrBuf(&CfgData);
412                 EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
413                               filename, strerror(errno));
414                 return;
415         }
416         close(fd);
417         if (server_shutting_down)
418                 return;
419
420         CfgPtr = NULL;
421         CfgType = NewStrBuf();
422         Line = NewStrBufPlain(NULL, StrLength(CfgData));
423         Done = 0;
424         while (!Done)
425         {
426                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
427                 if (StrLength(Line) > 0)
428                 {
429                         lPtr = NULL;
430                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
431                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
432                         {
433                                 if (Count == NULL)
434                                 {
435                                         Count = malloc(
436                                                 sizeof(rss_room_counter));
437                                         Count->count = 0;
438                                 }
439                                 Count->count ++;
440                                 RSSAggr = (rss_aggregator *) malloc(
441                                         sizeof(rss_aggregator));
442
443                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
444                                 RSSAggr->QRnumber = qrbuf->QRnumber;
445                                 RSSAggr->roomlist_parts = 1;
446                                 RSSAggr->Url = NewStrBuf();
447
448                                 StrBufExtract_NextToken(RSSAggr->Url,
449                                                         Line,
450                                                         &lPtr,
451                                                         '|');
452
453                                 pthread_mutex_lock(&RSSQueueMutex);
454                                 GetHash(RSSFetchUrls,
455                                         SKEY(RSSAggr->Url),
456                                         &vptr);
457
458                                 use_this_RSSAggr = (rss_aggregator *)vptr;
459                                 if (use_this_RSSAggr != NULL)
460                                 {
461                                         long *QRnumber;
462                                         StrBufAppendBufPlain(
463                                                 use_this_RSSAggr->rooms,
464                                                 qrbuf->QRname,
465                                                 -1, 0);
466                                         if (use_this_RSSAggr->roomlist_parts==1)
467                                         {
468                                                 use_this_RSSAggr->OtherQRnumbers
469                                                         = NewHash(1, lFlathash);
470                                         }
471                                         QRnumber = (long*)malloc(sizeof(long));
472                                         *QRnumber = qrbuf->QRnumber;
473                                         Put(use_this_RSSAggr->OtherQRnumbers,
474                                             LKEY(qrbuf->QRnumber),
475                                             QRnumber,
476                                             NULL);
477                                         use_this_RSSAggr->roomlist_parts++;
478
479                                         pthread_mutex_unlock(&RSSQueueMutex);
480
481                                         FreeStrBuf(&RSSAggr->Url);
482                                         free(RSSAggr);
483                                         RSSAggr = NULL;
484                                         continue;
485                                 }
486                                 pthread_mutex_unlock(&RSSQueueMutex);
487
488                                 RSSAggr->ItemType = RSS_UNSET;
489
490                                 RSSAggr->rooms = NewStrBufPlain(
491                                         qrbuf->QRname, -1);
492
493                                 pthread_mutex_lock(&RSSQueueMutex);
494
495                                 Put(RSSFetchUrls,
496                                     SKEY(RSSAggr->Url),
497                                     RSSAggr,
498                                     DeleteRssCfg);
499
500                                 pthread_mutex_unlock(&RSSQueueMutex);
501                         }
502                 }
503         }
504         if (Count != NULL)
505         {
506                 Count->QRnumber = qrbuf->QRnumber;
507                 pthread_mutex_lock(&RSSQueueMutex);
508                 EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
509                               qrbuf->QRnumber, qrbuf->QRname);
510                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
511                 pthread_mutex_unlock(&RSSQueueMutex);
512         }
513         FreeStrBuf(&CfgData);
514         FreeStrBuf(&CfgType);
515         FreeStrBuf(&Line);
516 }
517
518 /*
519  * Scan for rooms that have RSS client requests configured
520  */
521 void rssclient_scan(void) {
522         int RSSRoomCount, RSSCount;
523         rss_aggregator *rptr = NULL;
524         void *vrptr = NULL;
525         HashPos *it;
526         long len;
527         const char *Key;
528         time_t now = time(NULL);
529
530         /* Run no more than once every 15 minutes. */
531         if ((now - last_run) < 900) {
532                 EVRSSQ_syslog(LOG_DEBUG,
533                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
534                               ((now - last_run) / 60),
535                               ((now - last_run) % 60)
536                 );
537                 return;
538         }
539
540         /*
541          * This is a simple concurrency check to make sure only one rssclient
542          * run is done at a time.
543          */
544         pthread_mutex_lock(&RSSQueueMutex);
545         RSSCount = GetCount(RSSFetchUrls);
546         RSSRoomCount = GetCount(RSSQueueRooms);
547         pthread_mutex_unlock(&RSSQueueMutex);
548
549         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
550                 EVRSSQ_syslog(LOG_DEBUG,
551                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
552                               RSSRoomCount, RSSCount
553                         );
554                 return;
555         }
556
557         become_session(&rss_CC);
558         EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
559         CtdlForEachRoom(rssclient_scan_room, NULL);
560
561         pthread_mutex_lock(&RSSQueueMutex);
562
563         it = GetNewHashPos(RSSFetchUrls, 0);
564         while (!server_shutting_down &&
565                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
566                (vrptr != NULL)) {
567                 rptr = (rss_aggregator *)vrptr;
568                 if (!rss_do_fetching(rptr))
569                         UnlinkRSSAggregator(rptr);
570         }
571         DeleteHashPos(&it);
572         pthread_mutex_unlock(&RSSQueueMutex);
573
574         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
575         return;
576 }
577
578 void rss_cleanup(void)
579 {
580         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
581         DeleteHash(&RSSFetchUrls);
582         DeleteHash(&RSSQueueRooms);
583 }
584
585 void LogDebugEnableRSSClient(const int n)
586 {
587         RSSClientDebugEnabled = n;
588 }
589
590 CTDL_MODULE_INIT(rssclient)
591 {
592         if (threading)
593         {
594                 CtdlFillSystemContext(&rss_CC, "rssclient");
595                 pthread_mutex_init(&RSSQueueMutex, NULL);
596                 RSSQueueRooms = NewHash(1, lFlathash);
597                 RSSFetchUrls = NewHash(1, NULL);
598                 syslog(LOG_INFO, "%s\n", curl_version());
599                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
600                 CtdlRegisterEVCleanupHook(rss_cleanup);
601                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
602         }
603         return "rssclient";
604 }