Style cleanup
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #include <sys/time.h>
31 # else
32 #include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
70
71 eNextState RSSAggregator_Terminate(AsyncIO *IO);
72 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
74
75 struct rssnetcfg *rnclist = NULL;
76
77
78 void DeleteRoomReference(long QRnumber)
79 {
80         HashPos *At;
81         long HKLen;
82         const char *HK;
83         void *vData = NULL;
84         rss_room_counter *pRoomC;
85
86         At = GetNewHashPos(RSSQueueRooms, 0);
87
88         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
89         {
90                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
91                 if (vData != NULL)
92                 {
93                         pRoomC = (rss_room_counter *) vData;
94                         pRoomC->count --;
95                         if (pRoomC->count == 0)
96                                 DeleteEntryFromHash(RSSQueueRooms, At);
97                 }
98         }
99         DeleteHashPos(&At);
100 }
101
102 void UnlinkRooms(rss_aggregator *Cfg)
103 {
104         DeleteRoomReference(Cfg->QRnumber);
105         if (Cfg->OtherQRnumbers != NULL)
106         {
107                 long HKLen;
108                 const char *HK;
109                 HashPos *At;
110                 void *vData;
111
112                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
113                 while (! server_shutting_down &&
114                        GetNextHashPos(Cfg->OtherQRnumbers,
115                                       At,
116                                       &HKLen, &HK,
117                                       &vData) &&
118                        (vData != NULL))
119                 {
120                         long *lData = (long*) vData;
121                         DeleteRoomReference(*lData);
122                 }
123
124                 DeleteHashPos(&At);
125         }
126 }
127
128 void UnlinkRSSAggregator(rss_aggregator *Cfg)
129 {
130         HashPos *At;
131
132         UnlinkRooms(Cfg);
133
134         At = GetNewHashPos(RSSFetchUrls, 0);
135         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
136         {
137                 DeleteEntryFromHash(RSSFetchUrls, At);
138         }
139         DeleteHashPos(&At);
140         last_run = time(NULL);
141 }
142
143 void DeleteRssCfg(void *vptr)
144 {
145         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
146         AsyncIO *IO = &RSSAggr->IO;
147         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
148
149         FreeStrBuf(&RSSAggr->Url);
150         FreeStrBuf(&RSSAggr->rooms);
151         FreeStrBuf(&RSSAggr->CData);
152         FreeStrBuf(&RSSAggr->Key);
153         DeleteHash(&RSSAggr->OtherQRnumbers);
154
155         DeleteHashPos (&RSSAggr->Pos);
156         DeleteHash (&RSSAggr->Messages);
157         if (RSSAggr->recp.recp_room != NULL)
158                 free(RSSAggr->recp.recp_room);
159
160
161         if (RSSAggr->Item != NULL)
162         {
163                 flush_rss_item(RSSAggr->Item);
164
165                 free(RSSAggr->Item);
166         }
167
168         FreeAsyncIOContents(&RSSAggr->IO);
169         free(RSSAggr);
170 }
171
172 eNextState RSSAggregator_Terminate(AsyncIO *IO)
173 {
174         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
175
176         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
177
178
179         UnlinkRSSAggregator(RSSAggr);
180         return eAbort;
181 }
182
183 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
184 {
185         const char *pUrl;
186         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
187
188         pUrl = IO->ConnectMe->PlainUrl;
189         if (pUrl == NULL)
190                 pUrl = "";
191
192         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
193
194
195         UnlinkRSSAggregator(RSSAggr);
196         return eAbort;
197 }
198
199
200 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
201 {
202         return eAbort; ///TODO
203 }
204
205 eNextState RSSSaveMessage(AsyncIO *IO)
206 {
207         long len;
208         const char *Key;
209         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
210
211         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
212                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
213
214         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
215
216         /* write the uidl to the use table so we don't store this item again */
217         cdb_store(CDB_USETABLE,
218                   SKEY(RSSAggr->ThisMsg->MsgGUID),
219                   &RSSAggr->ThisMsg->ut,
220                   sizeof(struct UseTable) );
221
222         if (GetNextHashPos(RSSAggr->Messages,
223                            RSSAggr->Pos,
224                            &len, &Key,
225                            (void**) &RSSAggr->ThisMsg))
226                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
227         else
228                 return eAbort;
229 }
230
231 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
232 {
233         const char *Key;
234         long len;
235         struct cdbdata *cdbut;
236         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
237
238         /* Find out if we've already seen this item */
239         strcpy(Ctx->ThisMsg->ut.ut_msgid,
240                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
241         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
242
243         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
244 #ifndef DEBUG_RSS
245         if (cdbut != NULL) {
246                 /* Item has already been seen */
247                 EV_syslog(LOG_DEBUG,
248                           "%s has already been seen\n",
249                           ChrPtr(Ctx->ThisMsg->MsgGUID));
250                 cdb_free(cdbut);
251
252                 /* rewrite the record anyway, to update the timestamp */
253                 cdb_store(CDB_USETABLE,
254                           SKEY(Ctx->ThisMsg->MsgGUID),
255                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
256
257                 if (GetNextHashPos(Ctx->Messages,
258                                    Ctx->Pos,
259                                    &len, &Key,
260                                    (void**) &Ctx->ThisMsg))
261                         return NextDBOperation(
262                                 IO,
263                                 RSS_FetchNetworkUsetableEntry);
264                 else
265                         return eAbort;
266         }
267         else
268 #endif
269         {
270                 NextDBOperation(IO, RSSSaveMessage);
271                 return eSendMore;
272         }
273 }
274
275 /*
276  * Begin a feed parse
277  */
278 int rss_do_fetching(rss_aggregator *Cfg)
279 {
280         rss_item *ri;
281         time_t now;
282
283         now = time(NULL);
284
285         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
286                 return 0;
287
288         ri = (rss_item*) malloc(sizeof(rss_item));
289         memset(ri, 0, sizeof(rss_item));
290         Cfg->Item = ri;
291
292         if (! InitcURLIOStruct(&Cfg->IO,
293                                Cfg,
294                                "Citadel RSS Client",
295                                RSSAggregator_ParseReply,
296                                RSSAggregator_Terminate,
297                                RSSAggregator_ShutdownAbort))
298         {
299                 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
300                 return 0;
301         }
302
303         safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
304                     ChrPtr(Cfg->Url),
305                     sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
306
307         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
308         ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
309         CurlPrepareURL(Cfg->IO.ConnectMe);
310
311         QueueCurlContext(&Cfg->IO);
312         return 1;
313 }
314
315 /*
316  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
317  */
318 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
319 {
320         StrBuf *CfgData=NULL;
321         StrBuf *CfgType;
322         StrBuf *Line;
323         rss_room_counter *Count = NULL;
324         struct stat statbuf;
325         char filename[PATH_MAX];
326         int fd;
327         int Done;
328         rss_aggregator *RSSAggr = NULL;
329         rss_aggregator *use_this_RSSAggr = NULL;
330         void *vptr;
331         const char *CfgPtr, *lPtr;
332         const char *Err;
333
334         pthread_mutex_lock(&RSSQueueMutex);
335         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
336         {
337                 syslog(LOG_DEBUG,
338                        "rssclient: [%ld] %s already in progress.\n",
339                        qrbuf->QRnumber,
340                        qrbuf->QRname);
341                 pthread_mutex_unlock(&RSSQueueMutex);
342                 return;
343         }
344         pthread_mutex_unlock(&RSSQueueMutex);
345
346         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
347
348         if (server_shutting_down)
349                 return;
350
351         /* Only do net processing for rooms that have netconfigs */
352         fd = open(filename, 0);
353         if (fd <= 0) {
354                 /* syslog(LOG_DEBUG,
355                    "rssclient: %s no config.\n",
356                    qrbuf->QRname); */
357                 return;
358         }
359
360         if (server_shutting_down)
361                 return;
362
363         if (fstat(fd, &statbuf) == -1) {
364                 syslog(LOG_DEBUG,
365                        "ERROR: could not stat configfile '%s' - %s\n",
366                        filename,
367                        strerror(errno));
368                 return;
369         }
370
371         if (server_shutting_down)
372                 return;
373
374         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
375
376         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
377                 close(fd);
378                 FreeStrBuf(&CfgData);
379                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
380                        filename, strerror(errno));
381                 return;
382         }
383         close(fd);
384         if (server_shutting_down)
385                 return;
386
387         CfgPtr = NULL;
388         CfgType = NewStrBuf();
389         Line = NewStrBufPlain(NULL, StrLength(CfgData));
390         Done = 0;
391         while (!Done)
392         {
393                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
394                 if (StrLength(Line) > 0)
395                 {
396                         lPtr = NULL;
397                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
398                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
399                         {
400                                 if (Count == NULL)
401                                 {
402                                         Count = malloc(
403                                                 sizeof(rss_room_counter));
404                                         Count->count = 0;
405                                 }
406                                 Count->count ++;
407                                 RSSAggr = (rss_aggregator *) malloc(
408                                         sizeof(rss_aggregator));
409
410                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
411                                 RSSAggr->roomlist_parts = 1;
412                                 RSSAggr->Url = NewStrBuf();
413
414                                 StrBufExtract_NextToken(RSSAggr->Url,
415                                                         Line,
416                                                         &lPtr,
417                                                         '|');
418
419                                 pthread_mutex_lock(&RSSQueueMutex);
420                                 GetHash(RSSFetchUrls,
421                                         SKEY(RSSAggr->Url),
422                                         &vptr);
423
424                                 use_this_RSSAggr = (rss_aggregator *)vptr;
425                                 if (use_this_RSSAggr != NULL)
426                                 {
427                                         long *QRnumber;
428                                         StrBufAppendBufPlain(
429                                                 use_this_RSSAggr->rooms,
430                                                 qrbuf->QRname,
431                                                 -1, 0);
432                                         if (use_this_RSSAggr->roomlist_parts==1)
433                                         {
434                                                 use_this_RSSAggr->OtherQRnumbers
435                                                         = NewHash(1, lFlathash);
436                                         }
437                                         QRnumber = (long*)malloc(sizeof(long));
438                                         *QRnumber = qrbuf->QRnumber;
439                                         Put(use_this_RSSAggr->OtherQRnumbers,
440                                             LKEY(qrbuf->QRnumber),
441                                             QRnumber,
442                                             NULL);
443                                         use_this_RSSAggr->roomlist_parts++;
444
445                                         pthread_mutex_unlock(&RSSQueueMutex);
446
447                                         FreeStrBuf(&RSSAggr->Url);
448                                         free(RSSAggr);
449                                         RSSAggr = NULL;
450                                         continue;
451                                 }
452                                 pthread_mutex_unlock(&RSSQueueMutex);
453
454                                 RSSAggr->ItemType = RSS_UNSET;
455
456                                 RSSAggr->rooms = NewStrBufPlain(
457                                         qrbuf->QRname, -1);
458
459                                 pthread_mutex_lock(&RSSQueueMutex);
460
461                                 Put(RSSFetchUrls,
462                                     SKEY(RSSAggr->Url),
463                                     RSSAggr,
464                                     DeleteRssCfg);
465
466                                 pthread_mutex_unlock(&RSSQueueMutex);
467                         }
468                 }
469         }
470         if (Count != NULL)
471         {
472                 Count->QRnumber = qrbuf->QRnumber;
473                 pthread_mutex_lock(&RSSQueueMutex);
474                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
475                        qrbuf->QRnumber, qrbuf->QRname);
476                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
477                 pthread_mutex_unlock(&RSSQueueMutex);
478         }
479         FreeStrBuf(&CfgData);
480         FreeStrBuf(&CfgType);
481         FreeStrBuf(&Line);
482 }
483
484 /*
485  * Scan for rooms that have RSS client requests configured
486  */
487 void rssclient_scan(void) {
488         static int doing_rssclient = 0;
489         rss_aggregator *rptr = NULL;
490         void *vrptr = NULL;
491         HashPos *it;
492         long len;
493         const char *Key;
494
495         /* Run no more than once every 15 minutes. */
496         if ((time(NULL) - last_run) < 900) {
497                 return;
498         }
499
500         /*
501          * This is a simple concurrency check to make sure only one rssclient
502          * run is done at a time.We could do this with a mutex, but since we
503          * don't really require extremely fine granularity here, we'll do it
504          * with a static variable instead.
505          */
506         if (doing_rssclient) return;
507         doing_rssclient = 1;
508         if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
509                 return;
510
511         become_session(&rss_CC);
512         syslog(LOG_DEBUG, "rssclient started\n");
513         CtdlForEachRoom(rssclient_scan_room, NULL);
514
515         pthread_mutex_lock(&RSSQueueMutex);
516
517         it = GetNewHashPos(RSSFetchUrls, 0);
518         while (!server_shutting_down &&
519                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
520                (vrptr != NULL)) {
521                 rptr = (rss_aggregator *)vrptr;
522                 if (!rss_do_fetching(rptr))
523                         UnlinkRSSAggregator(rptr);
524         }
525         DeleteHashPos(&it);
526         pthread_mutex_unlock(&RSSQueueMutex);
527
528         syslog(LOG_DEBUG, "rssclient ended\n");
529         doing_rssclient = 0;
530         return;
531 }
532
533 void rss_cleanup(void)
534 {
535         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
536         DeleteHash(&RSSFetchUrls);
537         DeleteHash(&RSSQueueRooms);
538 }
539
540
541 CTDL_MODULE_INIT(rssclient)
542 {
543         if (threading)
544         {
545                 CtdlFillSystemContext(&rss_CC, "rssclient");
546                 pthread_mutex_init(&RSSQueueMutex, NULL);
547                 RSSQueueRooms = NewHash(1, lFlathash);
548                 RSSFetchUrls = NewHash(1, NULL);
549                 syslog(LOG_INFO, "%s\n", curl_version());
550                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
551                 CtdlRegisterCleanupHook(rss_cleanup);
552         }
553         return "rssclient";
554 }