b55260624f29621aaade2b1abba09bb899e41ced
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "IO[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,           \
94                              IO->ID, N)
95
96 void DeleteRoomReference(long QRnumber)
97 {
98         HashPos *At;
99         long HKLen;
100         const char *HK;
101         void *vData = NULL;
102         rss_room_counter *pRoomC;
103
104         At = GetNewHashPos(RSSQueueRooms, 0);
105
106         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
107         {
108                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
109                 if (vData != NULL)
110                 {
111                         pRoomC = (rss_room_counter *) vData;
112                         pRoomC->count --;
113                         if (pRoomC->count == 0)
114                                 DeleteEntryFromHash(RSSQueueRooms, At);
115                 }
116         }
117         DeleteHashPos(&At);
118 }
119
120 void UnlinkRooms(rss_aggregator *RSSAggr)
121 {
122         DeleteRoomReference(RSSAggr->QRnumber);
123         if (RSSAggr->OtherQRnumbers != NULL)
124         {
125                 long HKLen;
126                 const char *HK;
127                 HashPos *At;
128                 void *vData;
129
130                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
131                 while (! server_shutting_down &&
132                        GetNextHashPos(RSSAggr->OtherQRnumbers,
133                                       At,
134                                       &HKLen, &HK,
135                                       &vData) &&
136                        (vData != NULL))
137                 {
138                         long *lData = (long*) vData;
139                         DeleteRoomReference(*lData);
140                 }
141
142                 DeleteHashPos(&At);
143         }
144 }
145
146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
147 {
148         HashPos *At;
149
150         pthread_mutex_lock(&RSSQueueMutex);
151         UnlinkRooms(RSSAggr);
152
153         At = GetNewHashPos(RSSFetchUrls, 0);
154         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
155         {
156                 DeleteEntryFromHash(RSSFetchUrls, At);
157         }
158         DeleteHashPos(&At);
159         last_run = time(NULL);
160         pthread_mutex_unlock(&RSSQueueMutex);
161 }
162
163 void DeleteRssCfg(void *vptr)
164 {
165         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
166         AsyncIO *IO = &RSSAggr->IO;
167         EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
168
169         FreeStrBuf(&RSSAggr->Url);
170         FreeStrBuf(&RSSAggr->rooms);
171         FreeStrBuf(&RSSAggr->CData);
172         FreeStrBuf(&RSSAggr->Key);
173         DeleteHash(&RSSAggr->OtherQRnumbers);
174
175         DeleteHashPos (&RSSAggr->Pos);
176         DeleteHash (&RSSAggr->Messages);
177         if (RSSAggr->recp.recp_room != NULL)
178                 free(RSSAggr->recp.recp_room);
179
180
181         if (RSSAggr->Item != NULL)
182         {
183                 flush_rss_item(RSSAggr->Item);
184
185                 free(RSSAggr->Item);
186         }
187
188         FreeAsyncIOContents(&RSSAggr->IO);
189         memset(RSSAggr, 0, sizeof(rss_aggregator));
190         free(RSSAggr);
191 }
192
193 eNextState RSSAggregator_Terminate(AsyncIO *IO)
194 {
195         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
196
197         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
198
199         StopCurlWatchers(IO);
200         UnlinkRSSAggregator(RSSAggr);
201         return eAbort;
202 }
203
204 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
205 {
206         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
207
208         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
209
210
211         StopDBWatchers(&RSSAggr->IO);
212         UnlinkRSSAggregator(RSSAggr);
213         return eAbort;
214 }
215
216 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
217 {
218         const char *pUrl;
219         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
220
221         pUrl = IO->ConnectMe->PlainUrl;
222         if (pUrl == NULL)
223                 pUrl = "";
224
225         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
226
227         StopCurlWatchers(IO);
228         UnlinkRSSAggregator(RSSAggr);
229         return eAbort;
230 }
231
232 void AppendLink(StrBuf *Message,
233                 StrBuf *link,
234                 StrBuf *LinkTitle,
235                 const char *Title)
236 {
237         if (StrLength(link) > 0)
238         {
239                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
240                 StrBufAppendBuf(Message, link, 0);
241                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
242                 if (StrLength(LinkTitle) > 0)
243                         StrBufAppendBuf(Message, LinkTitle, 0);
244                 else if ((Title != NULL) && !IsEmptyStr(Title))
245                         StrBufAppendBufPlain(Message, Title, -1, 0);
246                 else
247                         StrBufAppendBuf(Message, link, 0);
248                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
249         }
250 }
251
252
253 void rss_format_item(networker_save_message *SaveMsg)
254 {
255         StrBuf *Message;
256         int msglen = 0;
257
258         if (SaveMsg->author_or_creator != NULL) {
259
260                 char *From;
261                 StrBuf *Encoded = NULL;
262                 int FromAt;
263
264                 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
265                                      StrLength(SaveMsg->author_or_creator),
266                                      512, 0);
267                 StrBufPlain(SaveMsg->author_or_creator, From, -1);
268                 StrBufTrim(SaveMsg->author_or_creator);
269                 free(From);
270
271                 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
272                 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
273                 {
274                         StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
275                         SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded);
276                         SaveMsg->Msg.cm_fields['P'] =
277                                 SmashStrBuf(&SaveMsg->author_email);
278                 }
279                 else
280                 {
281                         if (FromAt)
282                         {
283                                 SaveMsg->Msg.cm_fields['A'] =
284                                         SmashStrBuf(&SaveMsg->author_or_creator);
285                                 SaveMsg->Msg.cm_fields['P'] =
286                                         strdup(SaveMsg->Msg.cm_fields['A']);
287                         }
288                         else
289                         {
290                                 StrBufRFC2047encode(&Encoded,
291                                                     SaveMsg->author_or_creator);
292                                 SaveMsg->Msg.cm_fields['A'] =
293                                         SmashStrBuf(&Encoded);
294                                 SaveMsg->Msg.cm_fields['P'] =
295                                         strdup("rss@localhost");
296
297                         }
298                 }
299         }
300         else {
301                 SaveMsg->Msg.cm_fields['A'] = strdup("rss");
302         }
303
304         SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME);
305         if (SaveMsg->title != NULL) {
306                 long len;
307                 char *Sbj;
308                 StrBuf *Encoded, *QPEncoded;
309
310                 QPEncoded = NULL;
311                 StrBufSpaceToBlank(SaveMsg->title);
312                 len = StrLength(SaveMsg->title);
313                 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
314                 len = strlen(Sbj);
315                 if ((len > 0) && (Sbj[len - 1] == '\n'))
316                 {
317                         len --;
318                         Sbj[len] = '\0';
319                 }
320                 Encoded = NewStrBufPlain(Sbj, len);
321                 free(Sbj);
322
323                 StrBufTrim(Encoded);
324                 StrBufRFC2047encode(&QPEncoded, Encoded);
325
326                 SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded);
327                 FreeStrBuf(&Encoded);
328         }
329         if (SaveMsg->link == NULL)
330                 SaveMsg->link = NewStrBufPlain(HKEY(""));
331
332 #if 0 /* temporarily disable shorter urls. */
333         SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
334                 GetShorterUrls(SaveMsg->description);
335 #endif
336
337         msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
338
339         Message = NewStrBufPlain(NULL, msglen);
340
341         StrBufPlain(Message, HKEY(
342                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
343                             "<html><body>\n"));
344 #if 0 /* disable shorter url for now. */
345         SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
346 #endif
347         StrBufAppendBuf(Message, SaveMsg->description, 0);
348         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
349
350         AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
351         AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
352         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
353
354
355         SaveMsg->Message = Message;
356 }
357
358 eNextState RSSSaveMessage(AsyncIO *IO)
359 {
360         long len;
361         const char *Key;
362         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
363
364         rss_format_item(RSSAggr->ThisMsg);
365
366         RSSAggr->ThisMsg->Msg.cm_fields['M'] =
367                 SmashStrBuf(&RSSAggr->ThisMsg->Message);
368
369         CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
370
371         /* write the uidl to the use table so we don't store this item again */
372         cdb_store(CDB_USETABLE,
373                   SKEY(RSSAggr->ThisMsg->MsgGUID),
374                   &RSSAggr->ThisMsg->ut,
375                   sizeof(struct UseTable) );
376
377         if (GetNextHashPos(RSSAggr->Messages,
378                            RSSAggr->Pos,
379                            &len, &Key,
380                            (void**) &RSSAggr->ThisMsg))
381                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
382         else
383                 return eAbort;
384 }
385
386 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
387 {
388         const char *Key;
389         long len;
390         struct cdbdata *cdbut;
391         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
392
393         /* Find out if we've already seen this item */
394         strcpy(Ctx->ThisMsg->ut.ut_msgid,
395                ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
396         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
397
398         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
399 #ifndef DEBUG_RSS
400         if (cdbut != NULL) {
401                 /* Item has already been seen */
402                 EVRSSC_syslog(LOG_DEBUG,
403                           "%s has already been seen\n",
404                           ChrPtr(Ctx->ThisMsg->MsgGUID));
405                 cdb_free(cdbut);
406
407                 /* rewrite the record anyway, to update the timestamp */
408                 cdb_store(CDB_USETABLE,
409                           SKEY(Ctx->ThisMsg->MsgGUID),
410                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
411
412                 if (GetNextHashPos(Ctx->Messages,
413                                    Ctx->Pos,
414                                    &len, &Key,
415                                    (void**) &Ctx->ThisMsg))
416                         return NextDBOperation(
417                                 IO,
418                                 RSS_FetchNetworkUsetableEntry);
419                 else
420                         return eAbort;
421         }
422         else
423 #endif
424         {
425                 NextDBOperation(IO, RSSSaveMessage);
426                 return eSendMore;
427         }
428 }
429
430 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
431 {
432         struct UseTable ut;
433         u_char rawdigest[MD5_DIGEST_LEN];
434         struct MD5Context md5context;
435         StrBuf *guid;
436         struct cdbdata *cdbut;
437         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
438
439         if (IO->HttpReq.httpcode != 200)
440         {
441
442                 EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
443                               IO->HttpReq.httpcode);
444 // TODO: aide error message with rate limit
445                 return eAbort;
446         }
447
448         MD5Init(&md5context);
449
450         MD5Update(&md5context,
451                   (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
452
453         MD5Update(&md5context,
454                   (const unsigned char*)SKEY(Ctx->Url));
455
456         MD5Final(rawdigest, &md5context);
457         guid = NewStrBufPlain(NULL,
458                               MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
459         StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
460         StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
461         if (StrLength(guid) > 40)
462                 StrBufCutAt(guid, 40, NULL);
463         /* Find out if we've already seen this item */
464         memcpy(ut.ut_msgid, SKEY(guid));
465         ut.ut_timestamp = time(NULL);
466
467         cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
468 #ifndef DEBUG_RSS
469         if (cdbut != NULL) {
470                 /* Item has already been seen */
471                 EVRSSC_syslog(LOG_DEBUG,
472                               "%s has already been seen\n",
473                               ChrPtr(Ctx->Url));
474                 cdb_free(cdbut);
475         }
476
477         /* rewrite the record anyway, to update the timestamp */
478         cdb_store(CDB_USETABLE,
479                   SKEY(guid),
480                   &ut, sizeof(struct UseTable) );
481
482         if (cdbut != NULL) return eAbort;
483 #endif
484         return RSSAggregator_ParseReply(IO);
485 }
486
487 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
488 {
489         return QueueDBOperation(IO, RSSAggregator_AnalyseReply);
490 }
491
492 /*
493  * Begin a feed parse
494  */
495 int rss_do_fetching(rss_aggregator *RSSAggr)
496 {
497         AsyncIO         *IO = &RSSAggr->IO;
498         rss_item *ri;
499         time_t now;
500
501         now = time(NULL);
502
503         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
504                 return 0;
505
506         ri = (rss_item*) malloc(sizeof(rss_item));
507         memset(ri, 0, sizeof(rss_item));
508         RSSAggr->Item = ri;
509
510         if (! InitcURLIOStruct(&RSSAggr->IO,
511                                RSSAggr,
512                                "Citadel RSS Client",
513                                RSSAggregator_FinishHttp,
514                                RSSAggregator_Terminate,
515                                RSSAggregator_TerminateDB,
516                                RSSAggregator_ShutdownAbort))
517         {
518                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
519                 return 0;
520         }
521
522         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
523                     ChrPtr(RSSAggr->Url),
524                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
525
526         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
527         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
528         CurlPrepareURL(RSSAggr->IO.ConnectMe);
529
530         QueueCurlContext(&RSSAggr->IO);
531         return 1;
532 }
533
534 /*
535  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
536  */
537 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
538 {
539         StrBuf *CfgData=NULL;
540         StrBuf *CfgType;
541         StrBuf *Line;
542         rss_room_counter *Count = NULL;
543         struct stat statbuf;
544         char filename[PATH_MAX];
545         int fd;
546         int Done;
547         rss_aggregator *RSSAggr = NULL;
548         rss_aggregator *use_this_RSSAggr = NULL;
549         void *vptr;
550         const char *CfgPtr, *lPtr;
551         const char *Err;
552
553         pthread_mutex_lock(&RSSQueueMutex);
554         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
555         {
556                 EVRSSQ_syslog(LOG_DEBUG,
557                               "rssclient: [%ld] %s already in progress.\n",
558                               qrbuf->QRnumber,
559                               qrbuf->QRname);
560                 pthread_mutex_unlock(&RSSQueueMutex);
561                 return;
562         }
563         pthread_mutex_unlock(&RSSQueueMutex);
564
565         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
566
567         if (server_shutting_down)
568                 return;
569
570         /* Only do net processing for rooms that have netconfigs */
571         fd = open(filename, 0);
572         if (fd <= 0) {
573                 /* syslog(LOG_DEBUG,
574                    "rssclient: %s no config.\n",
575                    qrbuf->QRname); */
576                 return;
577         }
578
579         if (server_shutting_down)
580                 return;
581
582         if (fstat(fd, &statbuf) == -1) {
583                 EVRSSQ_syslog(LOG_DEBUG,
584                               "ERROR: could not stat configfile '%s' - %s\n",
585                               filename,
586                               strerror(errno));
587                 return;
588         }
589
590         if (server_shutting_down)
591                 return;
592
593         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
594
595         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
596                 close(fd);
597                 FreeStrBuf(&CfgData);
598                 EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
599                               filename, strerror(errno));
600                 return;
601         }
602         close(fd);
603         if (server_shutting_down)
604                 return;
605
606         CfgPtr = NULL;
607         CfgType = NewStrBuf();
608         Line = NewStrBufPlain(NULL, StrLength(CfgData));
609         Done = 0;
610         while (!Done)
611         {
612                 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
613                 if (StrLength(Line) > 0)
614                 {
615                         lPtr = NULL;
616                         StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
617                         if (!strcasecmp("rssclient", ChrPtr(CfgType)))
618                         {
619                                 if (Count == NULL)
620                                 {
621                                         Count = malloc(
622                                                 sizeof(rss_room_counter));
623                                         Count->count = 0;
624                                 }
625                                 Count->count ++;
626                                 RSSAggr = (rss_aggregator *) malloc(
627                                         sizeof(rss_aggregator));
628
629                                 memset (RSSAggr, 0, sizeof(rss_aggregator));
630                                 RSSAggr->QRnumber = qrbuf->QRnumber;
631                                 RSSAggr->roomlist_parts = 1;
632                                 RSSAggr->Url = NewStrBuf();
633
634                                 StrBufExtract_NextToken(RSSAggr->Url,
635                                                         Line,
636                                                         &lPtr,
637                                                         '|');
638
639                                 pthread_mutex_lock(&RSSQueueMutex);
640                                 GetHash(RSSFetchUrls,
641                                         SKEY(RSSAggr->Url),
642                                         &vptr);
643
644                                 use_this_RSSAggr = (rss_aggregator *)vptr;
645                                 if (use_this_RSSAggr != NULL)
646                                 {
647                                         long *QRnumber;
648                                         StrBufAppendBufPlain(
649                                                 use_this_RSSAggr->rooms,
650                                                 qrbuf->QRname,
651                                                 -1, 0);
652                                         if (use_this_RSSAggr->roomlist_parts==1)
653                                         {
654                                                 use_this_RSSAggr->OtherQRnumbers
655                                                         = NewHash(1, lFlathash);
656                                         }
657                                         QRnumber = (long*)malloc(sizeof(long));
658                                         *QRnumber = qrbuf->QRnumber;
659                                         Put(use_this_RSSAggr->OtherQRnumbers,
660                                             LKEY(qrbuf->QRnumber),
661                                             QRnumber,
662                                             NULL);
663                                         use_this_RSSAggr->roomlist_parts++;
664
665                                         pthread_mutex_unlock(&RSSQueueMutex);
666
667                                         FreeStrBuf(&RSSAggr->Url);
668                                         free(RSSAggr);
669                                         RSSAggr = NULL;
670                                         continue;
671                                 }
672                                 pthread_mutex_unlock(&RSSQueueMutex);
673
674                                 RSSAggr->ItemType = RSS_UNSET;
675
676                                 RSSAggr->rooms = NewStrBufPlain(
677                                         qrbuf->QRname, -1);
678
679                                 pthread_mutex_lock(&RSSQueueMutex);
680
681                                 Put(RSSFetchUrls,
682                                     SKEY(RSSAggr->Url),
683                                     RSSAggr,
684                                     DeleteRssCfg);
685
686                                 pthread_mutex_unlock(&RSSQueueMutex);
687                         }
688                 }
689         }
690         if (Count != NULL)
691         {
692                 Count->QRnumber = qrbuf->QRnumber;
693                 pthread_mutex_lock(&RSSQueueMutex);
694                 EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
695                               qrbuf->QRnumber, qrbuf->QRname);
696                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
697                 pthread_mutex_unlock(&RSSQueueMutex);
698         }
699         FreeStrBuf(&CfgData);
700         FreeStrBuf(&CfgType);
701         FreeStrBuf(&Line);
702 }
703
704 /*
705  * Scan for rooms that have RSS client requests configured
706  */
707 void rssclient_scan(void) {
708         int RSSRoomCount, RSSCount;
709         rss_aggregator *rptr = NULL;
710         void *vrptr = NULL;
711         HashPos *it;
712         long len;
713         const char *Key;
714         time_t now = time(NULL);
715
716         /* Run no more than once every 15 minutes. */
717         if ((now - last_run) < 900) {
718                 EVRSSQ_syslog(LOG_DEBUG,
719                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
720                               ((now - last_run) / 60),
721                               ((now - last_run) % 60)
722                 );
723                 return;
724         }
725
726         /*
727          * This is a simple concurrency check to make sure only one rssclient
728          * run is done at a time.
729          */
730         pthread_mutex_lock(&RSSQueueMutex);
731         RSSCount = GetCount(RSSFetchUrls);
732         RSSRoomCount = GetCount(RSSQueueRooms);
733         pthread_mutex_unlock(&RSSQueueMutex);
734
735         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
736                 EVRSSQ_syslog(LOG_DEBUG,
737                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
738                               RSSRoomCount, RSSCount
739                         );
740                 return;
741         }
742
743         become_session(&rss_CC);
744         EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
745         CtdlForEachRoom(rssclient_scan_room, NULL);
746
747         pthread_mutex_lock(&RSSQueueMutex);
748
749         it = GetNewHashPos(RSSFetchUrls, 0);
750         while (!server_shutting_down &&
751                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
752                (vrptr != NULL)) {
753                 rptr = (rss_aggregator *)vrptr;
754                 if (!rss_do_fetching(rptr))
755                         UnlinkRSSAggregator(rptr);
756         }
757         DeleteHashPos(&it);
758         pthread_mutex_unlock(&RSSQueueMutex);
759
760         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
761         return;
762 }
763
764 void rss_cleanup(void)
765 {
766         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
767         DeleteHash(&RSSFetchUrls);
768         DeleteHash(&RSSQueueRooms);
769 }
770
771 void LogDebugEnableRSSClient(const int n)
772 {
773         RSSClientDebugEnabled = n;
774 }
775
776 CTDL_MODULE_INIT(rssclient)
777 {
778         if (threading)
779         {
780                 CtdlFillSystemContext(&rss_CC, "rssclient");
781                 pthread_mutex_init(&RSSQueueMutex, NULL);
782                 RSSQueueRooms = NewHash(1, lFlathash);
783                 RSSFetchUrls = NewHash(1, NULL);
784                 syslog(LOG_INFO, "%s\n", curl_version());
785                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
786                 CtdlRegisterEVCleanupHook(rss_cleanup);
787                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
788         }
789         return "rssclient";
790 }