Eventhandling: fix shutdownhandlers
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,           \
94                              IO->ID, N)
95
96 void DeleteRoomReference(long QRnumber)
97 {
98         HashPos *At;
99         long HKLen;
100         const char *HK;
101         void *vData = NULL;
102         rss_room_counter *pRoomC;
103
104         At = GetNewHashPos(RSSQueueRooms, 0);
105
106         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
107         {
108                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
109                 if (vData != NULL)
110                 {
111                         pRoomC = (rss_room_counter *) vData;
112                         pRoomC->count --;
113                         if (pRoomC->count == 0)
114                                 DeleteEntryFromHash(RSSQueueRooms, At);
115                 }
116         }
117         DeleteHashPos(&At);
118 }
119
120 void UnlinkRooms(rss_aggregator *RSSAggr)
121 {
122         DeleteRoomReference(RSSAggr->QRnumber);
123         if (RSSAggr->OtherQRnumbers != NULL)
124         {
125                 long HKLen;
126                 const char *HK;
127                 HashPos *At;
128                 void *vData;
129
130                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
131                 while (! server_shutting_down &&
132                        GetNextHashPos(RSSAggr->OtherQRnumbers,
133                                       At,
134                                       &HKLen, &HK,
135                                       &vData) &&
136                        (vData != NULL))
137                 {
138                         long *lData = (long*) vData;
139                         DeleteRoomReference(*lData);
140                 }
141
142                 DeleteHashPos(&At);
143         }
144 }
145
146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
147 {
148         HashPos *At;
149
150         pthread_mutex_lock(&RSSQueueMutex);
151         UnlinkRooms(RSSAggr);
152
153         At = GetNewHashPos(RSSFetchUrls, 0);
154         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
155         {
156                 DeleteEntryFromHash(RSSFetchUrls, At);
157         }
158         DeleteHashPos(&At);
159         last_run = time(NULL);
160         pthread_mutex_unlock(&RSSQueueMutex);
161 }
162
163 void DeleteRssCfg(void *vptr)
164 {
165         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
166         AsyncIO *IO = &RSSAggr->IO;
167         EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
168
169         FreeStrBuf(&RSSAggr->Url);
170         FreeStrBuf(&RSSAggr->rooms);
171         FreeStrBuf(&RSSAggr->CData);
172         FreeStrBuf(&RSSAggr->Key);
173         DeleteHash(&RSSAggr->OtherQRnumbers);
174
175         DeleteHashPos (&RSSAggr->Pos);
176         DeleteHash (&RSSAggr->Messages);
177         if (RSSAggr->recp.recp_room != NULL)
178                 free(RSSAggr->recp.recp_room);
179
180
181         if (RSSAggr->Item != NULL)
182         {
183                 flush_rss_item(RSSAggr->Item);
184
185                 free(RSSAggr->Item);
186         }
187
188         FreeAsyncIOContents(&RSSAggr->IO);
189         memset(RSSAggr, 0, sizeof(rss_aggregator));
190         free(RSSAggr);
191 }
192
193 eNextState RSSAggregator_Terminate(AsyncIO *IO)
194 {
195         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
196
197         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
198
199         StopCurlWatchers(IO);
200         UnlinkRSSAggregator(RSSAggr);
201         return eAbort;
202 }
203
204 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
205 {
206         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
207
208         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
209
210
211         StopDBWatchers(&RSSAggr->IO);
212         UnlinkRSSAggregator(RSSAggr);
213         return eAbort;
214 }
215
216 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
217 {
218         const char *pUrl;
219         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
220
221         pUrl = IO->ConnectMe->PlainUrl;
222         if (pUrl == NULL)
223                 pUrl = "";
224
225         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
226
227
228         UnlinkRSSAggregator(RSSAggr);
229         return eAbort;
230 }
231
232 eNextState RSSSaveMessage(AsyncIO *IO)
233 {
234         long len;
235         const char *Key;
236         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
237
238         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
239                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
240
241         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
242
243         /* write the uidl to the use table so we don't store this item again */
244         cdb_store(CDB_USETABLE,
245                   SKEY(RSSAggr->ThisMsg->MsgGUID),
246                   &RSSAggr->ThisMsg->ut,
247                   sizeof(struct UseTable) );
248
249         if (GetNextHashPos(RSSAggr->Messages,
250                            RSSAggr->Pos,
251                            &len, &Key,
252                            (void**) &RSSAggr->ThisMsg))
253                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
254         else
255                 return eAbort;
256 }
257
258 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
259 {
260         const char *Key;
261         long len;
262         struct cdbdata *cdbut;
263         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
264
265         /* Find out if we've already seen this item */
266         strcpy(Ctx->ThisMsg->ut.ut_msgid,
267                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
268         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
269
270         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
271 #ifndef DEBUG_RSS
272         if (cdbut != NULL) {
273                 /* Item has already been seen */
274                 EVRSSC_syslog(LOG_DEBUG,
275                           "%s has already been seen\n",
276                           ChrPtr(Ctx->ThisMsg->MsgGUID));
277                 cdb_free(cdbut);
278
279                 /* rewrite the record anyway, to update the timestamp */
280                 cdb_store(CDB_USETABLE,
281                           SKEY(Ctx->ThisMsg->MsgGUID),
282                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
283
284                 if (GetNextHashPos(Ctx->Messages,
285                                    Ctx->Pos,
286                                    &len, &Key,
287                                    (void**) &Ctx->ThisMsg))
288                         return NextDBOperation(
289                                 IO,
290                                 RSS_FetchNetworkUsetableEntry);
291                 else
292                         return eAbort;
293         }
294         else
295 #endif
296         {
297                 NextDBOperation(IO, RSSSaveMessage);
298                 return eSendMore;
299         }
300 }
301
302 /*
303  * Begin a feed parse
304  */
305 int rss_do_fetching(rss_aggregator *RSSAggr)
306 {
307         AsyncIO         *IO = &RSSAggr->IO;
308         rss_item *ri;
309         time_t now;
310
311         now = time(NULL);
312
313         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
314                 return 0;
315
316         ri = (rss_item*) malloc(sizeof(rss_item));
317         memset(ri, 0, sizeof(rss_item));
318         RSSAggr->Item = ri;
319
320         if (! InitcURLIOStruct(&RSSAggr->IO,
321                                RSSAggr,
322                                "Citadel RSS Client",
323                                RSSAggregator_ParseReply,
324                                RSSAggregator_Terminate,
325                                RSSAggregator_TerminateDB,
326                                RSSAggregator_ShutdownAbort))
327         {
328                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
329                 return 0;
330         }
331
332         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
333                     ChrPtr(RSSAggr->Url),
334                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
335
336         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
337         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
338         CurlPrepareURL(RSSAggr->IO.ConnectMe);
339
340         QueueCurlContext(&RSSAggr->IO);
341         return 1;
342 }
343
344 /*
345  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
346  */
347 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
348 {
349         StrBuf *CfgData=NULL;
350         StrBuf *CfgType;
351         StrBuf *Line;
352         rss_room_counter *Count = NULL;
353         struct stat statbuf;
354         char filename[PATH_MAX];
355         int fd;
356         int Done;
357         rss_aggregator *RSSAggr = NULL;
358         rss_aggregator *use_this_RSSAggr = NULL;
359         void *vptr;
360         const char *CfgPtr, *lPtr;
361         const char *Err;
362
363         pthread_mutex_lock(&RSSQueueMutex);
364         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
365         {
366                 EVRSSQ_syslog(LOG_DEBUG,
367                               "rssclient: [%ld] %s already in progress.\n",
368                               qrbuf->QRnumber,
369                               qrbuf->QRname);
370                 pthread_mutex_unlock(&RSSQueueMutex);
371                 return;
372         }
373         pthread_mutex_unlock(&RSSQueueMutex);
374
375         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
376
377         if (server_shutting_down)
378                 return;
379
380         /* Only do net processing for rooms that have netconfigs */
381         fd = open(filename, 0);
382         if (fd <= 0) {
383                 /* syslog(LOG_DEBUG,
384                    "rssclient: %s no config.\n",
385                    qrbuf->QRname); */
386                 return;
387         }
388
389         if (server_shutting_down)
390                 return;
391
392         if (fstat(fd, &statbuf) == -1) {
393                 EVRSSQ_syslog(LOG_DEBUG,
394                               "ERROR: could not stat configfile '%s' - %s\n",
395                               filename,
396                               strerror(errno));
397                 return;
398         }
399
400         if (server_shutting_down)
401                 return;
402
403         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
404
405         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
406                 close(fd);
407                 FreeStrBuf(&CfgData);
408                 EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
409                               filename, strerror(errno));
410                 return;
411         }
412         close(fd);
413         if (server_shutting_down)
414                 return;
415
416         CfgPtr = NULL;
417         CfgType = NewStrBuf();
418         Line = NewStrBufPlain(NULL, StrLength(CfgData));
419         Done = 0;
420         while (!Done)
421         {
422                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
423                 if (StrLength(Line) > 0)
424                 {
425                         lPtr = NULL;
426                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
427                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
428                         {
429                                 if (Count == NULL)
430                                 {
431                                         Count = malloc(
432                                                 sizeof(rss_room_counter));
433                                         Count->count = 0;
434                                 }
435                                 Count->count ++;
436                                 RSSAggr = (rss_aggregator *) malloc(
437                                         sizeof(rss_aggregator));
438
439                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
440                                 RSSAggr->QRnumber = qrbuf->QRnumber;
441                                 RSSAggr->roomlist_parts = 1;
442                                 RSSAggr->Url = NewStrBuf();
443
444                                 StrBufExtract_NextToken(RSSAggr->Url,
445                                                         Line,
446                                                         &lPtr,
447                                                         '|');
448
449                                 pthread_mutex_lock(&RSSQueueMutex);
450                                 GetHash(RSSFetchUrls,
451                                         SKEY(RSSAggr->Url),
452                                         &vptr);
453
454                                 use_this_RSSAggr = (rss_aggregator *)vptr;
455                                 if (use_this_RSSAggr != NULL)
456                                 {
457                                         long *QRnumber;
458                                         StrBufAppendBufPlain(
459                                                 use_this_RSSAggr->rooms,
460                                                 qrbuf->QRname,
461                                                 -1, 0);
462                                         if (use_this_RSSAggr->roomlist_parts==1)
463                                         {
464                                                 use_this_RSSAggr->OtherQRnumbers
465                                                         = NewHash(1, lFlathash);
466                                         }
467                                         QRnumber = (long*)malloc(sizeof(long));
468                                         *QRnumber = qrbuf->QRnumber;
469                                         Put(use_this_RSSAggr->OtherQRnumbers,
470                                             LKEY(qrbuf->QRnumber),
471                                             QRnumber,
472                                             NULL);
473                                         use_this_RSSAggr->roomlist_parts++;
474
475                                         pthread_mutex_unlock(&RSSQueueMutex);
476
477                                         FreeStrBuf(&RSSAggr->Url);
478                                         free(RSSAggr);
479                                         RSSAggr = NULL;
480                                         continue;
481                                 }
482                                 pthread_mutex_unlock(&RSSQueueMutex);
483
484                                 RSSAggr->ItemType = RSS_UNSET;
485
486                                 RSSAggr->rooms = NewStrBufPlain(
487                                         qrbuf->QRname, -1);
488
489                                 pthread_mutex_lock(&RSSQueueMutex);
490
491                                 Put(RSSFetchUrls,
492                                     SKEY(RSSAggr->Url),
493                                     RSSAggr,
494                                     DeleteRssCfg);
495
496                                 pthread_mutex_unlock(&RSSQueueMutex);
497                         }
498                 }
499         }
500         if (Count != NULL)
501         {
502                 Count->QRnumber = qrbuf->QRnumber;
503                 pthread_mutex_lock(&RSSQueueMutex);
504                 EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
505                               qrbuf->QRnumber, qrbuf->QRname);
506                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
507                 pthread_mutex_unlock(&RSSQueueMutex);
508         }
509         FreeStrBuf(&CfgData);
510         FreeStrBuf(&CfgType);
511         FreeStrBuf(&Line);
512 }
513
514 /*
515  * Scan for rooms that have RSS client requests configured
516  */
517 void rssclient_scan(void) {
518         int RSSRoomCount, RSSCount;
519         rss_aggregator *rptr = NULL;
520         void *vrptr = NULL;
521         HashPos *it;
522         long len;
523         const char *Key;
524         time_t now = time(NULL);
525
526         /* Run no more than once every 15 minutes. */
527         if ((now - last_run) < 900) {
528                 EVRSSQ_syslog(LOG_DEBUG,
529                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
530                               ((now - last_run) / 60),
531                               ((now - last_run) % 60)
532                 );
533                 return;
534         }
535
536         /*
537          * This is a simple concurrency check to make sure only one rssclient
538          * run is done at a time.
539          */
540         pthread_mutex_lock(&RSSQueueMutex);
541         RSSCount = GetCount(RSSFetchUrls);
542         RSSRoomCount = GetCount(RSSQueueRooms);
543         pthread_mutex_unlock(&RSSQueueMutex);
544
545         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
546                 EVRSSQ_syslog(LOG_DEBUG,
547                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
548                               RSSRoomCount, RSSCount
549                         );
550                 return;
551         }
552
553         become_session(&rss_CC);
554         EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
555         CtdlForEachRoom(rssclient_scan_room, NULL);
556
557         pthread_mutex_lock(&RSSQueueMutex);
558
559         it = GetNewHashPos(RSSFetchUrls, 0);
560         while (!server_shutting_down &&
561                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
562                (vrptr != NULL)) {
563                 rptr = (rss_aggregator *)vrptr;
564                 if (!rss_do_fetching(rptr))
565                         UnlinkRSSAggregator(rptr);
566         }
567         DeleteHashPos(&it);
568         pthread_mutex_unlock(&RSSQueueMutex);
569
570         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
571         return;
572 }
573
574 void rss_cleanup(void)
575 {
576         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
577         DeleteHash(&RSSFetchUrls);
578         DeleteHash(&RSSQueueRooms);
579 }
580
581 void LogDebugEnableRSSClient(const int n)
582 {
583         RSSClientDebugEnabled = n;
584 }
585
586 CTDL_MODULE_INIT(rssclient)
587 {
588         if (threading)
589         {
590                 CtdlFillSystemContext(&rss_CC, "rssclient");
591                 pthread_mutex_init(&RSSQueueMutex, NULL);
592                 RSSQueueRooms = NewHash(1, lFlathash);
593                 RSSFetchUrls = NewHash(1, NULL);
594                 syslog(LOG_INFO, "%s\n", curl_version());
595                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
596                 CtdlRegisterEVCleanupHook(rss_cleanup);
597                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
598         }
599         return "rssclient";
600 }