c8f67c5c8c33d450e7bccaab377ad04d393805a9
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IOSTR, IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IOSTR, IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "%s[%ld][%ld]RSS" FORMAT,           \
94                              IOSTR, IO->ID, N)
95
96 typedef enum _RSSState {
97         eRSSCreated,
98         eRSSFetching,
99         eRSSFailure,
100         eRSSParsing,
101         eRSSUT
102 } RSSState;
103 ConstStr RSSStates[] = {
104         {HKEY("Aggregator created")},
105         {HKEY("Fetching content")},
106         {HKEY("Failed")},
107         {HKEY("parsing content")},
108         {HKEY("checking usetable")}
109 };
110
111 static void SetRSSState(AsyncIO *IO, RSSState State)
112 {
113         CitContext* CCC = IO->CitContext;
114         if (CCC != NULL)
115                 memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
116 }
117
118 void DeleteRoomReference(long QRnumber)
119 {
120         HashPos *At;
121         long HKLen;
122         const char *HK;
123         void *vData = NULL;
124         rss_room_counter *pRoomC;
125
126         At = GetNewHashPos(RSSQueueRooms, 0);
127
128         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
129         {
130                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
131                 if (vData != NULL)
132                 {
133                         pRoomC = (rss_room_counter *) vData;
134                         pRoomC->count --;
135                         if (pRoomC->count == 0)
136                                 DeleteEntryFromHash(RSSQueueRooms, At);
137                 }
138         }
139         DeleteHashPos(&At);
140 }
141
142 void UnlinkRooms(rss_aggregator *RSSAggr)
143 {
144         DeleteRoomReference(RSSAggr->Cfg.QRnumber);
145         if (RSSAggr->OtherQRnumbers != NULL)
146         {
147                 long HKLen;
148                 const char *HK;
149                 HashPos *At;
150                 void *vData;
151
152                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
153                 while (! server_shutting_down &&
154                        GetNextHashPos(RSSAggr->OtherQRnumbers,
155                                       At,
156                                       &HKLen, &HK,
157                                       &vData) &&
158                        (vData != NULL))
159                 {
160                         pRSSConfig *Data = (pRSSConfig*) vData;
161                         DeleteRoomReference(Data->QRnumber);
162                 }
163
164                 DeleteHashPos(&At);
165         }
166 }
167
168 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
169 {
170         HashPos *At;
171
172         pthread_mutex_lock(&RSSQueueMutex);
173         UnlinkRooms(RSSAggr);
174
175         At = GetNewHashPos(RSSFetchUrls, 0);
176         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
177         {
178                 DeleteEntryFromHash(RSSFetchUrls, At);
179         }
180         DeleteHashPos(&At);
181         last_run = time(NULL);
182         pthread_mutex_unlock(&RSSQueueMutex);
183 }
184
185 void DeleteRssCfg(void *vptr)
186 {
187         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
188         AsyncIO *IO = &RSSAggr->IO;
189
190         if (IO->CitContext != NULL)
191                 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
192
193         FreeStrBuf(&RSSAggr->Url);
194         FreeStrBuf(&RSSAggr->rooms);
195         FreeStrBuf(&RSSAggr->CData);
196         FreeStrBuf(&RSSAggr->Key);
197         DeleteHash(&RSSAggr->OtherQRnumbers);
198
199         DeleteHashPos (&RSSAggr->Pos);
200         DeleteHash (&RSSAggr->Messages);
201         if (RSSAggr->recp.recp_room != NULL)
202                 free(RSSAggr->recp.recp_room);
203
204
205         if (RSSAggr->Item != NULL)
206         {
207                 flush_rss_item(RSSAggr->Item);
208
209                 free(RSSAggr->Item);
210         }
211
212         FreeAsyncIOContents(&RSSAggr->IO);
213         memset(RSSAggr, 0, sizeof(rss_aggregator));
214         free(RSSAggr);
215 }
216
217 eNextState RSSAggregator_Terminate(AsyncIO *IO)
218 {
219         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
220
221         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
222
223         StopCurlWatchers(IO);
224         UnlinkRSSAggregator(RSSAggr);
225         return eAbort;
226 }
227
228 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
229 {
230         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
231
232         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
233
234
235         StopDBWatchers(&RSSAggr->IO);
236         UnlinkRSSAggregator(RSSAggr);
237         return eAbort;
238 }
239
240 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
241 {
242         const char *pUrl;
243         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
244
245         pUrl = IO->ConnectMe->PlainUrl;
246         if (pUrl == NULL)
247                 pUrl = "";
248
249         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
250
251         StopCurlWatchers(IO);
252         UnlinkRSSAggregator(RSSAggr);
253         return eAbort;
254 }
255
256 void AppendLink(StrBuf *Message,
257                 StrBuf *link,
258                 StrBuf *LinkTitle,
259                 const char *Title)
260 {
261         if (StrLength(link) > 0)
262         {
263                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
264                 StrBufAppendBuf(Message, link, 0);
265                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
266                 if (StrLength(LinkTitle) > 0)
267                         StrBufAppendBuf(Message, LinkTitle, 0);
268                 else if ((Title != NULL) && !IsEmptyStr(Title))
269                         StrBufAppendBufPlain(Message, Title, -1, 0);
270                 else
271                         StrBufAppendBuf(Message, link, 0);
272                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
273         }
274 }
275
276
277 int rss_format_item(AsyncIO *IO, networker_save_message *SaveMsg)
278 {
279         StrBuf *Message;
280         int msglen = 0;
281
282         if (StrLength(SaveMsg->description) + 
283             StrLength(SaveMsg->link) + 
284             StrLength(SaveMsg->linkTitle) + 
285             StrLength(SaveMsg->reLink) +
286             StrLength(SaveMsg->reLinkTitle) +
287             StrLength(SaveMsg->title) == 0)
288         {
289                 EVRSSCM_syslog(LOG_INFO, "Refusing to save empty message.");
290                 return 0;
291         }
292
293         CM_Flush(&SaveMsg->Msg);
294
295         if (SaveMsg->author_or_creator != NULL) {
296
297                 char *From;
298                 StrBuf *Encoded = NULL;
299                 int FromAt;
300
301                 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
302                                      StrLength(SaveMsg->author_or_creator),
303                                      512, 0);
304                 StrBufPlain(SaveMsg->author_or_creator, From, -1);
305                 StrBufTrim(SaveMsg->author_or_creator);
306                 free(From);
307
308                 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
309                 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
310                 {
311                         StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
312                         CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
313                         CM_SetAsFieldSB(&SaveMsg->Msg, eMessagePath, &SaveMsg->author_email);
314                 }
315                 else
316                 {
317                         if (FromAt)
318                         {
319                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &SaveMsg->author_or_creator);
320                                 CM_CopyField(&SaveMsg->Msg, eMessagePath, eAuthor);
321                         }
322                         else
323                         {
324                                 StrBufRFC2047encode(&Encoded,
325                                                     SaveMsg->author_or_creator);
326                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
327                                 CM_SetField(&SaveMsg->Msg, eMessagePath, HKEY("rss@localhost"));
328
329                         }
330                 }
331         }
332         else {
333                 CM_SetField(&SaveMsg->Msg, eAuthor, HKEY("rss"));
334         }
335
336         CM_SetField(&SaveMsg->Msg, eNodeName, CFG_KEY(c_nodename));
337         if (SaveMsg->title != NULL) {
338                 long len;
339                 char *Sbj;
340                 StrBuf *Encoded, *QPEncoded;
341
342                 QPEncoded = NULL;
343                 StrBufSpaceToBlank(SaveMsg->title);
344                 len = StrLength(SaveMsg->title);
345                 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
346                 len = strlen(Sbj);
347                 if ((len > 0) && (Sbj[len - 1] == '\n'))
348                 {
349                         len --;
350                         Sbj[len] = '\0';
351                 }
352                 Encoded = NewStrBufPlain(Sbj, len);
353                 free(Sbj);
354
355                 StrBufTrim(Encoded);
356                 StrBufRFC2047encode(&QPEncoded, Encoded);
357
358                 CM_SetAsFieldSB(&SaveMsg->Msg, eMsgSubject, &QPEncoded);
359                 FreeStrBuf(&Encoded);
360         }
361         if (SaveMsg->link == NULL)
362                 SaveMsg->link = NewStrBufPlain(HKEY(""));
363
364 #if 0 /* temporarily disable shorter urls. */
365         SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
366                 GetShorterUrls(SaveMsg->description);
367 #endif
368
369         msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
370
371         Message = NewStrBufPlain(NULL, msglen);
372
373         StrBufPlain(Message, HKEY(
374                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
375                             "<html><body>\n"));
376 #if 0 /* disable shorter url for now. */
377         SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
378 #endif
379         StrBufAppendBuf(Message, SaveMsg->description, 0);
380         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
381
382         AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
383         AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
384         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
385
386         SaveMsg->Message = Message;
387         return 1;
388 }
389
390 eNextState RSSSaveMessage(AsyncIO *IO)
391 {
392         long len;
393         const char *Key;
394         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
395
396         if (rss_format_item(IO, RSSAggr->ThisMsg))
397         {
398                 CM_SetAsFieldSB(&RSSAggr->ThisMsg->Msg, eMesageText,
399                                        &RSSAggr->ThisMsg->Message);
400
401                 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
402                 
403                 /* write the uidl to the use table so we don't store this item again */
404                 
405                 CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, EvGetNow(IO), 0, eWrite, CCID, IO->ID);
406         }
407
408         if (GetNextHashPos(RSSAggr->Messages,
409                            RSSAggr->Pos,
410                            &len, &Key,
411                            (void**) &RSSAggr->ThisMsg))
412                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
413         else
414                 return eAbort;
415 }
416
417 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
418 {
419         static const time_t antiExpire = USETABLE_ANTIEXPIRE_HIRES;
420 #ifndef DEBUG_RSS
421         time_t seenstamp = 0;
422         const char *Key;
423         long len;
424         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
425
426         /* Find out if we've already seen this item */
427 // todo: expiry?
428         SetRSSState(IO, eRSSUT);
429         seenstamp = CheckIfAlreadySeen("RSS Item Seen",
430                                        Ctx->ThisMsg->MsgGUID,
431                                        EvGetNow(IO),
432                                        antiExpire,
433                                        eCheckUpdate,
434                                        CCID, IO->ID);
435         if (seenstamp != 0)
436         {
437                 /* Item has already been seen */
438                 EVRSSC_syslog(LOG_DEBUG,
439                               "%s has already been seen - %ld < %ld",
440                               ChrPtr(Ctx->ThisMsg->MsgGUID),
441                               seenstamp, antiExpire);
442
443                 SetRSSState(IO, eRSSParsing);
444
445                 if (GetNextHashPos(Ctx->Messages,
446                                    Ctx->Pos,
447                                    &len, &Key,
448                                    (void**) &Ctx->ThisMsg))
449                         return NextDBOperation(
450                                 IO,
451                                 RSS_FetchNetworkUsetableEntry);
452                 else
453                         return eAbort;
454         }
455         else
456 #endif
457         {
458                 /* Item has already been seen */
459                 EVRSSC_syslog(LOG_DEBUG,
460                               "%s Parsing - %ld >= %ld",
461                               ChrPtr(Ctx->ThisMsg->MsgGUID),
462                               seenstamp, antiExpire);
463                 SetRSSState(IO, eRSSParsing);
464
465                 NextDBOperation(IO, RSSSaveMessage);
466                 return eSendMore;
467         }
468         return eSendMore;
469 }
470
471 void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now)
472 {
473         OneRoomNetCfg* pRNCfg;
474         begin_critical_section(S_NETCONFIGS);
475         pRNCfg = CtdlGetNetCfgForRoom (pCfg->QRnumber);
476         if (pRNCfg != NULL)
477         {
478                 RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient];
479
480                 while (RSSCfg != NULL)
481                 {
482                         if (RSSCfg == pCfg->pCfg)
483                                 break;
484
485                         RSSCfg = RSSCfg->next;
486                 }
487                 if (RSSCfg != NULL)
488                 {
489                         pRNCfg->changed = 1;
490                         RSSCfg->last_known_good = now;
491                 }
492         }
493
494         end_critical_section(S_NETCONFIGS);
495 }
496
497 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
498 {
499         HashPos *it = NULL;
500         long len;
501         const char *Key;
502         pRSSConfig *pCfg;
503         u_char rawdigest[MD5_DIGEST_LEN];
504         struct MD5Context md5context;
505         StrBuf *guid;
506         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
507
508         if (IO->HttpReq.httpcode != 200)
509         {
510                 StrBuf *ErrMsg;
511                 long lens[2];
512                 const char *strs[2];
513
514                 SetRSSState(IO, eRSSFailure);
515                 ErrMsg = NewStrBuf();
516                 if (IO) EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
517                               IO->HttpReq.httpcode);
518                 
519                 strs[0] = ChrPtr(Ctx->Url);
520                 lens[0] = StrLength(Ctx->Url);
521
522                 strs[1] = ChrPtr(Ctx->rooms);
523                 lens[1] = StrLength(Ctx->rooms);
524
525                 if (IO->HttpReq.CurlError == NULL)
526                         IO->HttpReq.CurlError = "";
527
528                 StrBufPrintf(ErrMsg,
529                              "Error while RSS-Aggregation Run of %s\n"
530                              " need a 200, got a %ld !\n"
531                              " Curl Error message: \n%s / %s\n"
532                              " Response text was: \n"
533                              " \n %s\n",
534                              ChrPtr(Ctx->Url),
535                              IO->HttpReq.httpcode,
536                              IO->HttpReq.errdesc,
537                              IO->HttpReq.CurlError,
538                              ChrPtr(IO->HttpReq.ReplyData)
539                         );
540
541                 CtdlAideFPMessage(
542                         ChrPtr(ErrMsg),
543                         "RSS Aggregation run failure",
544                         2, strs, (long*) &lens,
545                         CCID,
546                         IO->ID,
547                         EvGetNow(IO));
548                 
549                 FreeStrBuf(&ErrMsg);
550                 EVRSSC_syslog(LOG_DEBUG,
551                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>\n",
552                               ChrPtr(Ctx->Url),
553                               IO->HttpReq.httpcode);
554                 return eAbort;
555         }
556
557         pCfg = &Ctx->Cfg;
558
559         while (pCfg != NULL)
560         {
561                 UpdateLastKnownGood (pCfg, EvGetNow(IO));
562                 if ((Ctx->roomlist_parts > 1) && 
563                     (it == NULL))
564                 {
565                         it = GetNewHashPos(RSSFetchUrls, 0);
566                 }
567                 if (it != NULL)
568                 {
569                         void *vptr;
570                         if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr))
571                                 pCfg = vptr;
572                         else
573                                 pCfg = NULL;
574                 }
575                 else 
576                         pCfg = NULL;
577         }
578         DeleteHashPos (&it);
579
580         SetRSSState(IO, eRSSUT);
581
582         MD5Init(&md5context);
583
584         MD5Update(&md5context,
585                   (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
586
587         MD5Update(&md5context,
588                   (const unsigned char*)SKEY(Ctx->Url));
589
590         MD5Final(rawdigest, &md5context);
591         guid = NewStrBufPlain(NULL,
592                               MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
593         StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
594         StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
595         if (StrLength(guid) > 40)
596                 StrBufCutAt(guid, 40, NULL);
597         /* Find out if we've already seen this item */
598
599 #ifndef DEBUG_RSS
600
601         if (CheckIfAlreadySeen("RSS Whole",
602                                guid,
603                                EvGetNow(IO),
604                                EvGetNow(IO) - USETABLE_ANTIEXPIRE,
605                                eCheckUpdate,
606                                CCID, IO->ID)
607             != 0)
608         {
609                 FreeStrBuf(&guid);
610
611                 EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url));
612                 return eAbort;
613         }
614         FreeStrBuf(&guid);
615 #endif
616         SetRSSState(IO, eRSSParsing);
617         return RSSAggregator_ParseReply(IO);
618 }
619
620 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
621 {
622         return CurlQueueDBOperation(IO, RSSAggregator_AnalyseReply);
623 }
624
625 /*
626  * Begin a feed parse
627  */
628 int rss_do_fetching(rss_aggregator *RSSAggr)
629 {
630         AsyncIO         *IO = &RSSAggr->IO;
631         rss_item *ri;
632         time_t now;
633
634         now = time(NULL);
635
636         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
637                 return 0;
638
639         ri = (rss_item*) malloc(sizeof(rss_item));
640         memset(ri, 0, sizeof(rss_item));
641         RSSAggr->Item = ri;
642
643         if (! InitcURLIOStruct(&RSSAggr->IO,
644                                RSSAggr,
645                                "Citadel RSS Client",
646                                RSSAggregator_FinishHttp,
647                                RSSAggregator_Terminate,
648                                RSSAggregator_TerminateDB,
649                                RSSAggregator_ShutdownAbort))
650         {
651                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
652                 return 0;
653         }
654         SetRSSState(IO, eRSSCreated);
655
656         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
657                     ChrPtr(RSSAggr->Url),
658                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
659
660         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
661         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
662         CurlPrepareURL(RSSAggr->IO.ConnectMe);
663
664         SetRSSState(IO, eRSSFetching);
665         QueueCurlContext(&RSSAggr->IO);
666         return 1;
667 }
668
669 /*
670  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
671  */
672 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
673 {
674         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
675         rss_aggregator *RSSAggr = NULL;
676         rss_aggregator *use_this_RSSAggr = NULL;
677         void *vptr;
678
679         pthread_mutex_lock(&RSSQueueMutex);
680         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
681         {
682                 EVRSSQ_syslog(LOG_DEBUG,
683                               "rssclient: [%ld] %s already in progress.\n",
684                               qrbuf->QRnumber,
685                               qrbuf->QRname);
686                 pthread_mutex_unlock(&RSSQueueMutex);
687                 return;
688         }
689         pthread_mutex_unlock(&RSSQueueMutex);
690
691         if (server_shutting_down) return;
692
693         while (RSSCfg != NULL)
694         {
695                 pthread_mutex_lock(&RSSQueueMutex);
696                 GetHash(RSSFetchUrls,
697                         SKEY(RSSCfg->Url),
698                         &vptr);
699
700                 use_this_RSSAggr = (rss_aggregator *)vptr;
701                 if (use_this_RSSAggr != NULL)
702                 {
703                         pRSSConfig *pRSSCfg;
704
705                         StrBufAppendBufPlain(
706                                 use_this_RSSAggr->rooms,
707                                 qrbuf->QRname,
708                                 -1, 0);
709                         if (use_this_RSSAggr->roomlist_parts==1)
710                         {
711                                 use_this_RSSAggr->OtherQRnumbers
712                                         = NewHash(1, lFlathash);
713                         }
714
715                         pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig));
716
717                         pRSSCfg->QRnumber = qrbuf->QRnumber;
718                         pRSSCfg->pCfg = RSSCfg;
719
720                         Put(use_this_RSSAggr->OtherQRnumbers,
721                             LKEY(qrbuf->QRnumber),
722                             pRSSCfg,
723                             NULL);
724                         use_this_RSSAggr->roomlist_parts++;
725
726                         pthread_mutex_unlock(&RSSQueueMutex);
727
728                         RSSCfg = RSSCfg->next;
729                         continue;
730                 }
731                 pthread_mutex_unlock(&RSSQueueMutex);
732
733                 RSSAggr = (rss_aggregator *) malloc(
734                         sizeof(rss_aggregator));
735
736                 memset (RSSAggr, 0, sizeof(rss_aggregator));
737                 RSSAggr->Cfg.QRnumber = qrbuf->QRnumber;
738                 RSSAggr->Cfg.pCfg = RSSCfg;
739                 RSSAggr->roomlist_parts = 1;
740                 RSSAggr->Url = NewStrBufDup(RSSCfg->Url);
741
742                 RSSAggr->ItemType = RSS_UNSET;
743
744                 RSSAggr->rooms = NewStrBufPlain(
745                         qrbuf->QRname, -1);
746
747                 pthread_mutex_lock(&RSSQueueMutex);
748
749                 Put(RSSFetchUrls,
750                     SKEY(RSSAggr->Url),
751                     RSSAggr,
752                     DeleteRssCfg);
753
754                 pthread_mutex_unlock(&RSSQueueMutex);
755                 RSSCfg = RSSCfg->next;
756         }
757 }
758
759 /*
760  * Scan for rooms that have RSS client requests configured
761  */
762 void rssclient_scan(void) {
763         int RSSRoomCount, RSSCount;
764         rss_aggregator *rptr = NULL;
765         void *vrptr = NULL;
766         HashPos *it;
767         long len;
768         const char *Key;
769         time_t now = time(NULL);
770
771         /* Run no more than once every 15 minutes. */
772         if ((now - last_run) < 900) {
773                 EVRSSQ_syslog(LOG_DEBUG,
774                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
775                               ((now - last_run) / 60),
776                               ((now - last_run) % 60)
777                 );
778                 return;
779         }
780
781         /*
782          * This is a simple concurrency check to make sure only one rssclient
783          * run is done at a time.
784          */
785         pthread_mutex_lock(&RSSQueueMutex);
786         RSSCount = GetCount(RSSFetchUrls);
787         RSSRoomCount = GetCount(RSSQueueRooms);
788         pthread_mutex_unlock(&RSSQueueMutex);
789
790         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
791                 EVRSSQ_syslog(LOG_DEBUG,
792                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
793                               RSSRoomCount, RSSCount
794                         );
795                 return;
796         }
797
798         become_session(&rss_CC);
799         EVRSSQM_syslog(LOG_DEBUG, "rssclient started");
800         CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
801
802         if (GetCount(RSSFetchUrls) > 0)
803         {
804                 pthread_mutex_lock(&RSSQueueMutex);
805                 EVRSSQ_syslog(LOG_DEBUG,
806                                "rssclient starting %d Clients",
807                                GetCount(RSSFetchUrls));
808                 
809                 it = GetNewHashPos(RSSFetchUrls, 0);
810                 while (!server_shutting_down &&
811                        GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
812                        (vrptr != NULL)) {
813                         rptr = (rss_aggregator *)vrptr;
814                         if (!rss_do_fetching(rptr))
815                                 UnlinkRSSAggregator(rptr);
816                 }
817                 DeleteHashPos(&it);
818                 pthread_mutex_unlock(&RSSQueueMutex);
819         }
820         else
821                 EVRSSQM_syslog(LOG_DEBUG, "Nothing to do.");
822
823         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
824         return;
825 }
826
827 void rss_cleanup(void)
828 {
829         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
830         DeleteHash(&RSSFetchUrls);
831         DeleteHash(&RSSQueueRooms);
832 }
833
834 void LogDebugEnableRSSClient(const int n)
835 {
836         RSSClientDebugEnabled = n;
837 }
838
839
840 typedef struct __RSSVetoInfo {
841         StrBuf *ErrMsg;
842         time_t Now;
843         int Veto;
844 }RSSVetoInfo;
845
846 void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
847 {
848         RSSVetoInfo *Info = (RSSVetoInfo *) data;
849         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
850
851         while (RSSCfg != NULL)
852         {
853                 if ((RSSCfg->last_known_good != 0) &&
854                     (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now))
855                 {
856                         StrBufAppendPrintf(Info->ErrMsg,
857                                            "RSS feed not seen for a %d days:: <",
858                                            (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60));
859
860                         StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0);
861                         StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0);
862                 }
863                 RSSCfg = RSSCfg->next;
864         }
865 }
866
867 int RSSCheckUsetableVeto(StrBuf *ErrMsg)
868 {
869         RSSVetoInfo Info;
870
871         Info.ErrMsg = ErrMsg;
872         Info.Now = time (NULL);
873         Info.Veto = 0;
874
875         CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient);
876
877         return Info.Veto;;
878 }
879
880
881
882
883 void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG)
884 {
885         RSSCfgLine *RSSCfg;
886
887         RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine));
888         RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line));
889         
890
891         StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|');
892         RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|');
893
894
895         RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C];
896         OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg;
897 }
898
899 void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data)
900 {
901         RSSCfgLine *RSSCfg = (RSSCfgLine*) data;
902
903         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
904         StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0);
905         StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0);
906         StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good);
907 }
908
909 void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data)
910 {
911         RSSCfgLine *RSSCfg = (RSSCfgLine*) *data;
912
913         FreeStrBuf(&RSSCfg->Url);
914         free(*data);
915         *data = NULL;
916 }
917
918
919 CTDL_MODULE_INIT(rssclient)
920 {
921         if (!threading)
922         {
923                 CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0);
924
925                 CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine);
926                 pthread_mutex_init(&RSSQueueMutex, NULL);
927                 RSSQueueRooms = NewHash(1, lFlathash);
928                 RSSFetchUrls = NewHash(1, NULL);
929                 syslog(LOG_INFO, "%s\n", curl_version());
930                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
931                 CtdlRegisterEVCleanupHook(rss_cleanup);
932                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
933         }
934         else
935         {
936                 CtdlFillSystemContext(&rss_CC, "rssclient");
937         }
938         return "rssclient";
939 }