6d4392393610785433efc3caad7f91067519afa5
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  * 
9  * 
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14  * GNU General Public License for more details.
15  *
16  * 
17  * 
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #include <sys/time.h>
31 # else
32 #include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
70
71 eNextState RSSAggregator_Terminate(AsyncIO *IO);
72 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
73 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
74 struct CitContext rss_CC;
75
76 struct rssnetcfg *rnclist = NULL;
77
78
79 void DeleteRoomReference(long QRnumber)
80 {
81         HashPos *At;
82         long HKLen;
83         const char *HK;
84         void *vData = NULL;
85         rss_room_counter *pRoomC;
86
87         At = GetNewHashPos(RSSQueueRooms, 0);
88
89         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
90         {
91                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
92                 if (vData != NULL)
93                 {
94                         pRoomC = (rss_room_counter *) vData;
95                         pRoomC->count --;
96                         if (pRoomC->count == 0)
97                                 DeleteEntryFromHash(RSSQueueRooms, At);
98                 }
99         }
100         DeleteHashPos(&At);
101 }
102
103 void UnlinkRooms(rss_aggregator *Cfg)
104 {
105         DeleteRoomReference(Cfg->QRnumber);
106         if (Cfg->OtherQRnumbers != NULL)
107         {
108                 long HKLen;
109                 const char *HK;
110                 HashPos *At;
111                 void *vData;
112
113                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
114                 while (! server_shutting_down &&
115                        GetNextHashPos(Cfg->OtherQRnumbers,
116                                       At,
117                                       &HKLen, &HK,
118                                       &vData) &&
119                        (vData != NULL))
120                 {
121                         long *lData = (long*) vData;
122                         DeleteRoomReference(*lData);
123                 }
124
125                 DeleteHashPos(&At);
126         }
127 }
128
129 void UnlinkRSSAggregator(rss_aggregator *Cfg)
130 {
131         HashPos *At;
132
133         UnlinkRooms(Cfg);
134
135         At = GetNewHashPos(RSSFetchUrls, 0);
136         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
137         {
138                 DeleteEntryFromHash(RSSFetchUrls, At);
139         }
140         DeleteHashPos(&At);
141         last_run = time(NULL);
142 }
143
144 void DeleteRssCfg(void *vptr)
145 {
146         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
147         AsyncIO *IO = &RSSAggr->IO;
148         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
149
150         FreeStrBuf(&RSSAggr->Url);
151         FreeStrBuf(&RSSAggr->rooms);
152         FreeStrBuf(&RSSAggr->CData);
153         FreeStrBuf(&RSSAggr->Key);
154         DeleteHash(&RSSAggr->OtherQRnumbers);
155
156         DeleteHashPos (&RSSAggr->Pos);
157         DeleteHash (&RSSAggr->Messages);
158         if (RSSAggr->recp.recp_room != NULL)
159                 free(RSSAggr->recp.recp_room);
160
161
162         if (RSSAggr->Item != NULL)
163         {
164                 flush_rss_item(RSSAggr->Item);
165
166                 free(RSSAggr->Item);
167         }
168
169         FreeAsyncIOContents(&RSSAggr->IO);
170         free(RSSAggr);
171 }
172
173 eNextState RSSAggregator_Terminate(AsyncIO *IO)
174 {
175         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
176
177         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
178
179
180         UnlinkRSSAggregator(RSSAggr);
181         return eAbort;
182 }
183
184 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
185 {
186         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
187
188         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
189
190
191         UnlinkRSSAggregator(RSSAggr);
192         return eAbort;
193 }
194
195 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
196 {
197         const char *pUrl;
198         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
199
200         pUrl = IO->ConnectMe->PlainUrl;
201         if (pUrl == NULL)
202                 pUrl = "";
203
204         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
205
206
207         UnlinkRSSAggregator(RSSAggr);
208         return eAbort;
209 }
210
211
212 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
213 {
214         return eAbort; ///TODO
215 }
216
217 eNextState RSSSaveMessage(AsyncIO *IO)
218 {
219         long len;
220         const char *Key;
221         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
222
223         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
224                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
225
226         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
227
228         /* write the uidl to the use table so we don't store this item again */
229         cdb_store(CDB_USETABLE,
230                   SKEY(RSSAggr->ThisMsg->MsgGUID),
231                   &RSSAggr->ThisMsg->ut,
232                   sizeof(struct UseTable) );
233
234         if (GetNextHashPos(RSSAggr->Messages,
235                            RSSAggr->Pos,
236                            &len, &Key,
237                            (void**) &RSSAggr->ThisMsg))
238                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
239         else
240                 return eAbort;
241 }
242
243 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
244 {
245         const char *Key;
246         long len;
247         struct cdbdata *cdbut;
248         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
249
250         /* Find out if we've already seen this item */
251         strcpy(Ctx->ThisMsg->ut.ut_msgid,
252                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
253         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
254
255         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
256 #ifndef DEBUG_RSS
257         if (cdbut != NULL) {
258                 /* Item has already been seen */
259                 EV_syslog(LOG_DEBUG,
260                           "%s has already been seen\n",
261                           ChrPtr(Ctx->ThisMsg->MsgGUID));
262                 cdb_free(cdbut);
263
264                 /* rewrite the record anyway, to update the timestamp */
265                 cdb_store(CDB_USETABLE,
266                           SKEY(Ctx->ThisMsg->MsgGUID),
267                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
268
269                 if (GetNextHashPos(Ctx->Messages,
270                                    Ctx->Pos,
271                                    &len, &Key,
272                                    (void**) &Ctx->ThisMsg))
273                         return NextDBOperation(
274                                 IO,
275                                 RSS_FetchNetworkUsetableEntry);
276                 else
277                         return eAbort;
278         }
279         else
280 #endif
281         {
282                 NextDBOperation(IO, RSSSaveMessage);
283                 return eSendMore;
284         }
285 }
286
287 /*
288  * Begin a feed parse
289  */
290 int rss_do_fetching(rss_aggregator *Cfg)
291 {
292         rss_item *ri;
293         time_t now;
294
295         now = time(NULL);
296
297         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
298                 return 0;
299
300         ri = (rss_item*) malloc(sizeof(rss_item));
301         memset(ri, 0, sizeof(rss_item));
302         Cfg->Item = ri;
303
304         if (! InitcURLIOStruct(&Cfg->IO,
305                                Cfg,
306                                "Citadel RSS Client",
307                                RSSAggregator_ParseReply,
308                                RSSAggregator_Terminate,
309                                RSSAggregator_TerminateDB,
310                                RSSAggregator_ShutdownAbort))
311         {
312                 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
313                 return 0;
314         }
315
316         safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
317                     ChrPtr(Cfg->Url),
318                     sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
319
320         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
321         ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
322         CurlPrepareURL(Cfg->IO.ConnectMe);
323
324         QueueCurlContext(&Cfg->IO);
325         return 1;
326 }
327
328 /*
329  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
330  */
331 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
332 {
333         StrBuf *CfgData=NULL;
334         StrBuf *CfgType;
335         StrBuf *Line;
336         rss_room_counter *Count = NULL;
337         struct stat statbuf;
338         char filename[PATH_MAX];
339         int fd;
340         int Done;
341         rss_aggregator *RSSAggr = NULL;
342         rss_aggregator *use_this_RSSAggr = NULL;
343         void *vptr;
344         const char *CfgPtr, *lPtr;
345         const char *Err;
346
347         pthread_mutex_lock(&RSSQueueMutex);
348         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
349         {
350                 syslog(LOG_DEBUG,
351                        "rssclient: [%ld] %s already in progress.\n",
352                        qrbuf->QRnumber,
353                        qrbuf->QRname);
354                 pthread_mutex_unlock(&RSSQueueMutex);
355                 return;
356         }
357         pthread_mutex_unlock(&RSSQueueMutex);
358
359         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
360
361         if (server_shutting_down)
362                 return;
363
364         /* Only do net processing for rooms that have netconfigs */
365         fd = open(filename, 0);
366         if (fd <= 0) {
367                 /* syslog(LOG_DEBUG,
368                    "rssclient: %s no config.\n",
369                    qrbuf->QRname); */
370                 return;
371         }
372
373         if (server_shutting_down)
374                 return;
375
376         if (fstat(fd, &statbuf) == -1) {
377                 syslog(LOG_DEBUG,
378                        "ERROR: could not stat configfile '%s' - %s\n",
379                        filename,
380                        strerror(errno));
381                 return;
382         }
383
384         if (server_shutting_down)
385                 return;
386
387         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
388
389         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
390                 close(fd);
391                 FreeStrBuf(&CfgData);
392                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
393                        filename, strerror(errno));
394                 return;
395         }
396         close(fd);
397         if (server_shutting_down)
398                 return;
399
400         CfgPtr = NULL;
401         CfgType = NewStrBuf();
402         Line = NewStrBufPlain(NULL, StrLength(CfgData));
403         Done = 0;
404         while (!Done)
405         {
406                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
407                 if (StrLength(Line) > 0)
408                 {
409                         lPtr = NULL;
410                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
411                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
412                         {
413                                 if (Count == NULL)
414                                 {
415                                         Count = malloc(
416                                                 sizeof(rss_room_counter));
417                                         Count->count = 0;
418                                 }
419                                 Count->count ++;
420                                 RSSAggr = (rss_aggregator *) malloc(
421                                         sizeof(rss_aggregator));
422
423                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
424                                 RSSAggr->QRnumber = qrbuf->QRnumber;
425                                 RSSAggr->roomlist_parts = 1;
426                                 RSSAggr->Url = NewStrBuf();
427
428                                 StrBufExtract_NextToken(RSSAggr->Url,
429                                                         Line,
430                                                         &lPtr,
431                                                         '|');
432
433                                 pthread_mutex_lock(&RSSQueueMutex);
434                                 GetHash(RSSFetchUrls,
435                                         SKEY(RSSAggr->Url),
436                                         &vptr);
437
438                                 use_this_RSSAggr = (rss_aggregator *)vptr;
439                                 if (use_this_RSSAggr != NULL)
440                                 {
441                                         long *QRnumber;
442                                         StrBufAppendBufPlain(
443                                                 use_this_RSSAggr->rooms,
444                                                 qrbuf->QRname,
445                                                 -1, 0);
446                                         if (use_this_RSSAggr->roomlist_parts==1)
447                                         {
448                                                 use_this_RSSAggr->OtherQRnumbers
449                                                         = NewHash(1, lFlathash);
450                                         }
451                                         QRnumber = (long*)malloc(sizeof(long));
452                                         *QRnumber = qrbuf->QRnumber;
453                                         Put(use_this_RSSAggr->OtherQRnumbers,
454                                             LKEY(qrbuf->QRnumber),
455                                             QRnumber,
456                                             NULL);
457                                         use_this_RSSAggr->roomlist_parts++;
458
459                                         pthread_mutex_unlock(&RSSQueueMutex);
460
461                                         FreeStrBuf(&RSSAggr->Url);
462                                         free(RSSAggr);
463                                         RSSAggr = NULL;
464                                         continue;
465                                 }
466                                 pthread_mutex_unlock(&RSSQueueMutex);
467
468                                 RSSAggr->ItemType = RSS_UNSET;
469
470                                 RSSAggr->rooms = NewStrBufPlain(
471                                         qrbuf->QRname, -1);
472
473                                 pthread_mutex_lock(&RSSQueueMutex);
474
475                                 Put(RSSFetchUrls,
476                                     SKEY(RSSAggr->Url),
477                                     RSSAggr,
478                                     DeleteRssCfg);
479
480                                 pthread_mutex_unlock(&RSSQueueMutex);
481                         }
482                 }
483         }
484         if (Count != NULL)
485         {
486                 Count->QRnumber = qrbuf->QRnumber;
487                 pthread_mutex_lock(&RSSQueueMutex);
488                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
489                        qrbuf->QRnumber, qrbuf->QRname);
490                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
491                 pthread_mutex_unlock(&RSSQueueMutex);
492         }
493         FreeStrBuf(&CfgData);
494         FreeStrBuf(&CfgType);
495         FreeStrBuf(&Line);
496 }
497
498 /*
499  * Scan for rooms that have RSS client requests configured
500  */
501 void rssclient_scan(void) {
502         rss_aggregator *rptr = NULL;
503         void *vrptr = NULL;
504         HashPos *it;
505         long len;
506         const char *Key;
507         time_t now = time(NULL);
508
509         /* Run no more than once every 15 minutes. */
510         if ((now - last_run) < 900) {
511                 return;
512         }
513
514         /*
515          * This is a simple concurrency check to make sure only one rssclient
516          * run is done at a time.We could do this with a mutex, but since we
517          * don't really require extremely fine granularity here, we'll do it
518          * with a static variable instead.
519          */
520
521         if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
522                 return;
523
524         become_session(&rss_CC);
525         syslog(LOG_DEBUG, "rssclient started\n");
526         CtdlForEachRoom(rssclient_scan_room, NULL);
527
528         pthread_mutex_lock(&RSSQueueMutex);
529
530         it = GetNewHashPos(RSSFetchUrls, 0);
531         while (!server_shutting_down &&
532                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
533                (vrptr != NULL)) {
534                 rptr = (rss_aggregator *)vrptr;
535                 if (!rss_do_fetching(rptr))
536                         UnlinkRSSAggregator(rptr);
537         }
538         DeleteHashPos(&it);
539         pthread_mutex_unlock(&RSSQueueMutex);
540
541         syslog(LOG_DEBUG, "rssclient ended\n");
542         return;
543 }
544
545 void rss_cleanup(void)
546 {
547         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
548         DeleteHash(&RSSFetchUrls);
549         DeleteHash(&RSSQueueRooms);
550 }
551
552
553 CTDL_MODULE_INIT(rssclient)
554 {
555         if (threading)
556         {
557                 CtdlFillSystemContext(&rss_CC, "rssclient");
558                 pthread_mutex_init(&RSSQueueMutex, NULL);
559                 RSSQueueRooms = NewHash(1, lFlathash);
560                 RSSFetchUrls = NewHash(1, NULL);
561                 syslog(LOG_INFO, "%s\n", curl_version());
562                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
563                 CtdlRegisterEVCleanupHook(rss_cleanup);
564         }
565         return "rssclient";
566 }