4daba9bddb52e548a948250b0e3c7cdf24afc4bf
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2016 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IOSTR, IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IOSTR, IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "%s[%ld][%ld]RSS" FORMAT,           \
94                              IOSTR, IO->ID, N)
95
96 typedef enum _RSSState {
97         eRSSCreated,
98         eRSSFetching,
99         eRSSFailure,
100         eRSSParsing,
101         eRSSUT
102 } RSSState;
103 ConstStr RSSStates[] = {
104         {HKEY("Aggregator created")},
105         {HKEY("Fetching content")},
106         {HKEY("Failed")},
107         {HKEY("parsing content")},
108         {HKEY("checking usetable")}
109 };
110
111
112 static size_t GetLocationString( void *ptr, size_t size, size_t nmemb, void *userdata)
113 {
114 #define LOCATION "location"
115         if (strncasecmp((char*)ptr, LOCATION, sizeof(LOCATION) - 1) == 0)
116         {
117                 AsyncIO *IO = (AsyncIO *) userdata;
118                 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
119
120                 char *pch = (char*) ptr;
121                 char *pche;
122                 
123                 pche = pch + (size * nmemb);
124                 pch += sizeof(LOCATION);
125                 
126                 while (isspace(*pch) || (*pch == ':'))
127                         pch ++;
128
129                 while (isspace(*pche) || (*pche == '\0'))
130                         pche--;
131                 if (RSSAggr->RedirectUrl == NULL) {
132                         RSSAggr->RedirectUrl = NewStrBufPlain(pch, pche - pch + 1);
133                 }
134                 else {
135                         FlushStrBuf(RSSAggr->RedirectUrl);
136                         StrBufPlain(RSSAggr->RedirectUrl, pch, pche - pch + 1); 
137                 }
138         }
139         return size * nmemb;
140 }
141
142
143 static void SetRSSState(AsyncIO *IO, RSSState State)
144 {
145         CitContext* CCC = IO->CitContext;
146         if (CCC != NULL) {
147                 memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
148         }
149 }
150
151
152 void DeleteRoomReference(long QRnumber)
153 {
154         HashPos *At;
155         long HKLen;
156         const char *HK;
157         void *vData = NULL;
158         rss_room_counter *pRoomC;
159
160         At = GetNewHashPos(RSSQueueRooms, 0);
161
162         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
163         {
164                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
165                 if (vData != NULL)
166                 {
167                         pRoomC = (rss_room_counter *) vData;
168                         pRoomC->count --;
169                         if (pRoomC->count == 0)
170                                 DeleteEntryFromHash(RSSQueueRooms, At);
171                 }
172         }
173         DeleteHashPos(&At);
174 }
175
176 void UnlinkRooms(rss_aggregator *RSSAggr)
177 {
178         DeleteRoomReference(RSSAggr->Cfg.QRnumber);
179         if (RSSAggr->OtherQRnumbers != NULL)
180         {
181                 long HKLen;
182                 const char *HK;
183                 HashPos *At;
184                 void *vData;
185
186                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
187                 while (! server_shutting_down &&
188                        GetNextHashPos(RSSAggr->OtherQRnumbers,
189                                       At,
190                                       &HKLen, &HK,
191                                       &vData) &&
192                        (vData != NULL))
193                 {
194                         pRSSConfig *Data = (pRSSConfig*) vData;
195                         DeleteRoomReference(Data->QRnumber);
196                 }
197
198                 DeleteHashPos(&At);
199         }
200 }
201
202 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
203 {
204         HashPos *At;
205
206         pthread_mutex_lock(&RSSQueueMutex);
207         UnlinkRooms(RSSAggr);
208
209         At = GetNewHashPos(RSSFetchUrls, 0);
210         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
211         {
212                 DeleteEntryFromHash(RSSFetchUrls, At);
213         }
214         DeleteHashPos(&At);
215         last_run = time(NULL);
216         pthread_mutex_unlock(&RSSQueueMutex);
217 }
218
219 void DeleteRssCfg(void *vptr)
220 {
221         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
222         AsyncIO *IO = &RSSAggr->IO;
223
224         if (IO->CitContext != NULL) {
225                 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying");
226         }
227
228         FreeStrBuf(&RSSAggr->Url);
229         FreeStrBuf(&RSSAggr->RedirectUrl);
230         FreeStrBuf(&RSSAggr->rooms);
231         FreeStrBuf(&RSSAggr->CData);
232         FreeStrBuf(&RSSAggr->Key);
233         DeleteHash(&RSSAggr->OtherQRnumbers);
234
235         DeleteHashPos (&RSSAggr->Pos);
236         DeleteHash (&RSSAggr->Messages);
237         if (RSSAggr->recp.recp_room != NULL)
238                 free(RSSAggr->recp.recp_room);
239
240
241         if (RSSAggr->Item != NULL)
242         {
243                 flush_rss_item(RSSAggr->Item);
244
245                 free(RSSAggr->Item);
246         }
247
248         FreeAsyncIOContents(&RSSAggr->IO);
249         memset(RSSAggr, 0, sizeof(rss_aggregator));
250         free(RSSAggr);
251 }
252
253 eNextState RSSAggregator_Terminate(AsyncIO *IO)
254 {
255         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
256
257         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.");
258
259         StopCurlWatchers(IO);
260         UnlinkRSSAggregator(RSSAggr);
261         return eAbort;
262 }
263
264 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
265 {
266         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
267
268         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.");
269
270
271         StopDBWatchers(&RSSAggr->IO);
272         UnlinkRSSAggregator(RSSAggr);
273         return eAbort;
274 }
275
276 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
277 {
278         const char *pUrl;
279         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
280
281         pUrl = IO->ConnectMe->PlainUrl;
282         if (pUrl == NULL)
283                 pUrl = "";
284
285         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.", pUrl);
286
287         StopCurlWatchers(IO);
288         UnlinkRSSAggregator(RSSAggr);
289         return eAbort;
290 }
291
292 void AppendLink(StrBuf *Message,
293                 StrBuf *link,
294                 StrBuf *LinkTitle,
295                 const char *Title)
296 {
297         if (StrLength(link) > 0)
298         {
299                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
300                 StrBufAppendBuf(Message, link, 0);
301                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
302                 if (StrLength(LinkTitle) > 0)
303                         StrBufAppendBuf(Message, LinkTitle, 0);
304                 else if ((Title != NULL) && !IsEmptyStr(Title))
305                         StrBufAppendBufPlain(Message, Title, -1, 0);
306                 else
307                         StrBufAppendBuf(Message, link, 0);
308                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
309         }
310 }
311
312
313 int rss_format_item(AsyncIO *IO, networker_save_message *SaveMsg)
314 {
315         StrBuf *Message;
316         int msglen = 0;
317
318         if (StrLength(SaveMsg->description) + 
319             StrLength(SaveMsg->link) + 
320             StrLength(SaveMsg->linkTitle) + 
321             StrLength(SaveMsg->reLink) +
322             StrLength(SaveMsg->reLinkTitle) +
323             StrLength(SaveMsg->title) == 0)
324         {
325                 EVRSSCM_syslog(LOG_INFO, "Refusing to save empty message.");
326                 return 0;
327         }
328
329         CM_Flush(&SaveMsg->Msg);
330
331         if (SaveMsg->author_or_creator != NULL) {
332
333                 char *From;
334                 StrBuf *Encoded = NULL;
335                 int FromAt;
336
337                 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
338                                      StrLength(SaveMsg->author_or_creator),
339                                      512, 0);
340                 StrBufPlain(SaveMsg->author_or_creator, From, -1);
341                 StrBufTrim(SaveMsg->author_or_creator);
342                 free(From);
343
344                 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
345                 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
346                 {
347                         StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
348                         CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
349                         CM_SetAsFieldSB(&SaveMsg->Msg, eMessagePath, &SaveMsg->author_email);
350                 }
351                 else
352                 {
353                         if (FromAt)
354                         {
355                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &SaveMsg->author_or_creator);
356                                 CM_CopyField(&SaveMsg->Msg, eMessagePath, eAuthor);
357                         }
358                         else
359                         {
360                                 StrBufRFC2047encode(&Encoded,
361                                                     SaveMsg->author_or_creator);
362                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
363                                 CM_SetField(&SaveMsg->Msg, eMessagePath, HKEY("rss@localhost"));
364
365                         }
366                 }
367         }
368         else {
369                 CM_SetField(&SaveMsg->Msg, eAuthor, HKEY("rss"));
370         }
371
372         CM_SetField(&SaveMsg->Msg, eNodeName, CtdlGetConfigStr("c_nodename"), strlen(CtdlGetConfigStr("c_nodename")));
373         if (SaveMsg->title != NULL) {
374                 long len;
375                 char *Sbj;
376                 StrBuf *Encoded, *QPEncoded;
377
378                 QPEncoded = NULL;
379                 StrBufSpaceToBlank(SaveMsg->title);
380                 len = StrLength(SaveMsg->title);
381                 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
382                 if (!IsEmptyStr(Sbj)) {
383                         len = strlen(Sbj);
384                         if ((Sbj[len - 1] == '\n'))
385                         {
386                                 len --;
387                                 Sbj[len] = '\0';
388                         }
389                         Encoded = NewStrBufPlain(Sbj, len);
390                 
391
392                         StrBufTrim(Encoded);
393                         StrBufRFC2047encode(&QPEncoded, Encoded);
394                         
395                         CM_SetAsFieldSB(&SaveMsg->Msg, eMsgSubject, &QPEncoded);
396                         FreeStrBuf(&Encoded);
397                 }
398                 if (Sbj != NULL) {
399                         free(Sbj);
400                 }
401         }
402         if (SaveMsg->link == NULL)
403                 SaveMsg->link = NewStrBufPlain(HKEY(""));
404
405 #if 0 /* temporarily disable shorter urls. */
406         SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
407                 GetShorterUrls(SaveMsg->description);
408 #endif
409
410         msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
411
412         Message = NewStrBufPlain(NULL, msglen);
413
414         StrBufPlain(Message, HKEY(
415                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
416                             "<html><body>\n"));
417 #if 0 /* disable shorter url for now. */
418         SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
419 #endif
420         StrBufAppendBuf(Message, SaveMsg->description, 0);
421         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
422
423         AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
424         AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
425         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
426
427         SaveMsg->Message = Message;
428         return 1;
429 }
430
431 eNextState RSSSaveMessage(AsyncIO *IO)
432 {
433         long len;
434         const char *Key;
435         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
436
437         if (rss_format_item(IO, RSSAggr->ThisMsg))
438         {
439                 CM_SetAsFieldSB(&RSSAggr->ThisMsg->Msg, eMesageText,
440                                        &RSSAggr->ThisMsg->Message);
441
442                 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
443                 
444                 /* write the uidl to the use table so we don't store this item again */
445                 
446                 CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, EvGetNow(IO), 0, eWrite, CCID, IO->ID);
447         }
448
449         if (GetNextHashPos(RSSAggr->Messages,
450                            RSSAggr->Pos,
451                            &len, &Key,
452                            (void**) &RSSAggr->ThisMsg))
453                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
454         else
455                 return eAbort;
456 }
457
458 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
459 {
460         static const time_t antiExpire = USETABLE_ANTIEXPIRE_HIRES;
461 #ifndef DEBUG_RSS
462         time_t seenstamp = 0;
463         const char *Key;
464         long len;
465         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
466
467         /* Find out if we've already seen this item */
468 // todo: expiry?
469         SetRSSState(IO, eRSSUT);
470         seenstamp = CheckIfAlreadySeen("RSS Item Seen",
471                                        Ctx->ThisMsg->MsgGUID,
472                                        EvGetNow(IO),
473                                        antiExpire,
474                                        eCheckUpdate,
475                                        CCID, IO->ID);
476         if (seenstamp != 0)
477         {
478                 /* Item has already been seen */
479                 EVRSSC_syslog(LOG_DEBUG,
480                               "%s has already been seen - %ld < %ld",
481                               ChrPtr(Ctx->ThisMsg->MsgGUID),
482                               seenstamp, antiExpire);
483
484                 SetRSSState(IO, eRSSParsing);
485
486                 if (GetNextHashPos(Ctx->Messages,
487                                    Ctx->Pos,
488                                    &len, &Key,
489                                    (void**) &Ctx->ThisMsg))
490                         return NextDBOperation(
491                                 IO,
492                                 RSS_FetchNetworkUsetableEntry);
493                 else
494                         return eAbort;
495         }
496         else
497 #endif
498         {
499                 /* Item has already been seen */
500                 EVRSSC_syslog(LOG_DEBUG,
501                               "%s Parsing - %ld >= %ld",
502                               ChrPtr(Ctx->ThisMsg->MsgGUID),
503                               seenstamp, antiExpire);
504                 SetRSSState(IO, eRSSParsing);
505
506                 NextDBOperation(IO, RSSSaveMessage);
507                 return eSendMore;
508         }
509         return eSendMore;
510 }
511
512 void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now)
513 {
514         OneRoomNetCfg *pRNCfg;
515         begin_critical_section(S_NETCONFIGS);
516         pRNCfg = CtdlGetNetCfgForRoom(pCfg->QRnumber);
517         if (pRNCfg != NULL)
518         {
519                 RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient];
520
521                 while (RSSCfg != NULL)
522                 {
523                         if (RSSCfg == pCfg->pCfg)
524                                 break;
525
526                         RSSCfg = RSSCfg->next;
527                 }
528                 if (RSSCfg != NULL)
529                 {
530                         RSSCfg->last_known_good = now;
531                 }
532         }
533         SaveRoomNetConfigFile(pRNCfg, pCfg->QRnumber);
534         FreeRoomNetworkStruct(&pRNCfg);
535         end_critical_section(S_NETCONFIGS);
536 }
537
538 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
539 {
540         HashPos *it = NULL;
541         long len;
542         const char *Key;
543         pRSSConfig *pCfg;
544         u_char rawdigest[MD5_DIGEST_LEN];
545         struct MD5Context md5context;
546         StrBuf *guid;
547         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
548
549
550         if ((IO->HttpReq.httpcode >= 300) && (IO->HttpReq.httpcode < 400)  && (Ctx->RedirectUrl != NULL))
551         {
552                 StrBuf *ErrMsg;
553                 long lens[2];
554                 const char *strs[2];
555
556                 SetRSSState(IO, eRSSFailure);
557                 ErrMsg = NewStrBuf();
558                 if (IO) {
559                         EVRSSC_syslog(LOG_INFO, "need a 200, got a %ld !",
560                                       IO->HttpReq.httpcode);
561                 }
562                 strs[0] = ChrPtr(Ctx->Url);
563                 lens[0] = StrLength(Ctx->Url);
564
565                 strs[1] = ChrPtr(Ctx->rooms);
566                 lens[1] = StrLength(Ctx->rooms);
567
568                 if (IO->HttpReq.CurlError == NULL)
569                         IO->HttpReq.CurlError = "";
570
571                 StrBufPrintf(ErrMsg,
572                              "Error while RSS-Aggregation Run of %s\n"
573                              " need a 200, got a %ld !\n"
574                              " Curl Error message: \n%s / %s\n"
575                              " Redirect header points to: %s\n"
576                              " Response text was: \n"
577                              " \n %s\n",
578                              ChrPtr(Ctx->Url),
579                              IO->HttpReq.httpcode,
580                              IO->HttpReq.errdesc,
581                              IO->HttpReq.CurlError,
582                              ChrPtr(Ctx->RedirectUrl),
583                              ChrPtr(IO->HttpReq.ReplyData)
584                         );
585
586                 CtdlAideFPMessage(
587                         ChrPtr(ErrMsg),
588                         "RSS Aggregation run failure",
589                         2, strs, (long*) &lens,
590                         CCID,
591                         IO->ID,
592                         EvGetNow(IO));
593                 
594                 FreeStrBuf(&ErrMsg);
595                 EVRSSC_syslog(LOG_DEBUG,
596                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>",
597                               ChrPtr(Ctx->Url),
598                               IO->HttpReq.httpcode);
599                 return eAbort;
600         }
601         else if (IO->HttpReq.httpcode != 200)
602         {
603                 StrBuf *ErrMsg;
604                 long lens[2];
605                 const char *strs[2];
606
607                 SetRSSState(IO, eRSSFailure);
608                 ErrMsg = NewStrBuf();
609                 if (IO) {
610                         EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !",
611                                       IO->HttpReq.httpcode);
612                 }
613                 strs[0] = ChrPtr(Ctx->Url);
614                 lens[0] = StrLength(Ctx->Url);
615
616                 strs[1] = ChrPtr(Ctx->rooms);
617                 lens[1] = StrLength(Ctx->rooms);
618
619                 if (IO->HttpReq.CurlError == NULL)
620                         IO->HttpReq.CurlError = "";
621
622                 StrBufPrintf(ErrMsg,
623                              "Error while RSS-Aggregation Run of %s\n"
624                              " need a 200, got a %ld !\n"
625                              " Curl Error message: \n%s / %s\n"
626                              " Response text was: \n"
627                              " \n %s\n",
628                              ChrPtr(Ctx->Url),
629                              IO->HttpReq.httpcode,
630                              IO->HttpReq.errdesc,
631                              IO->HttpReq.CurlError,
632                              ChrPtr(IO->HttpReq.ReplyData)
633                         );
634
635                 CtdlAideFPMessage(
636                         ChrPtr(ErrMsg),
637                         "RSS Aggregation run failure",
638                         2, strs, (long*) &lens,
639                         CCID,
640                         IO->ID,
641                         EvGetNow(IO));
642                 
643                 FreeStrBuf(&ErrMsg);
644                 EVRSSC_syslog(LOG_DEBUG,
645                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>",
646                               ChrPtr(Ctx->Url),
647                               IO->HttpReq.httpcode);
648                 return eAbort;
649         }
650
651         pCfg = &Ctx->Cfg;
652
653         while (pCfg != NULL)
654         {
655                 UpdateLastKnownGood (pCfg, EvGetNow(IO));
656                 if ((Ctx->roomlist_parts > 1) && 
657                     (it == NULL))
658                 {
659                         it = GetNewHashPos(RSSFetchUrls, 0);
660                 }
661                 if (it != NULL)
662                 {
663                         void *vptr;
664                         if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr))
665                                 pCfg = vptr;
666                         else
667                                 pCfg = NULL;
668                 }
669                 else 
670                         pCfg = NULL;
671         }
672         DeleteHashPos (&it);
673
674         SetRSSState(IO, eRSSUT);
675
676         MD5Init(&md5context);
677
678         MD5Update(&md5context,
679                   (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
680
681         MD5Update(&md5context,
682                   (const unsigned char*)SKEY(Ctx->Url));
683
684         MD5Final(rawdigest, &md5context);
685         guid = NewStrBufPlain(NULL,
686                               MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
687         StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
688         StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
689         if (StrLength(guid) > 40)
690                 StrBufCutAt(guid, 40, NULL);
691         /* Find out if we've already seen this item */
692
693 #ifndef DEBUG_RSS
694
695         if (CheckIfAlreadySeen("RSS Whole",
696                                guid,
697                                EvGetNow(IO),
698                                EvGetNow(IO) - USETABLE_ANTIEXPIRE,
699                                eUpdate,
700                                CCID, IO->ID)
701             != 0)
702         {
703                 FreeStrBuf(&guid);
704
705                 EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>", ChrPtr(Ctx->Url));
706                 return eAbort;
707         }
708         FreeStrBuf(&guid);
709 #endif
710         SetRSSState(IO, eRSSParsing);
711         return RSSAggregator_ParseReply(IO);
712 }
713
714 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
715 {
716         return CurlQueueDBOperation(IO, RSSAggregator_AnalyseReply);
717 }
718
719 /*
720  * Begin a feed parse
721  */
722 int rss_do_fetching(rss_aggregator *RSSAggr)
723 {
724         AsyncIO         *IO = &RSSAggr->IO;
725         rss_item *ri;
726         time_t now;
727         CURLcode sta;
728         CURL *chnd;
729
730
731         now = time(NULL);
732
733         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
734                 return 0;
735
736         ri = (rss_item*) malloc(sizeof(rss_item));
737         memset(ri, 0, sizeof(rss_item));
738         RSSAggr->Item = ri;
739
740         if (! InitcURLIOStruct(&RSSAggr->IO,
741                                RSSAggr,
742                                "Citadel RSS Client",
743                                RSSAggregator_FinishHttp,
744                                RSSAggregator_Terminate,
745                                RSSAggregator_TerminateDB,
746                                RSSAggregator_ShutdownAbort))
747         {
748                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.");
749                 return 0;
750         }
751         chnd = IO->HttpReq.chnd;
752         OPT(HEADERDATA, IO);
753         OPT(HEADERFUNCTION, GetLocationString);
754         SetRSSState(IO, eRSSCreated);
755
756         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
757                     ChrPtr(RSSAggr->Url),
758                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
759
760         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>", ChrPtr(RSSAggr->Url));
761         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
762         CurlPrepareURL(RSSAggr->IO.ConnectMe);
763
764         SetRSSState(IO, eRSSFetching);
765         QueueCurlContext(&RSSAggr->IO);
766         return 1;
767 }
768
769 /*
770  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
771  */
772 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
773 {
774         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
775         rss_aggregator *RSSAggr = NULL;
776         rss_aggregator *use_this_RSSAggr = NULL;
777         void *vptr;
778
779         EVRSSQ_syslog(LOG_DEBUG, "rssclient_scan_room(%s)", qrbuf->QRname);
780         pthread_mutex_lock(&RSSQueueMutex);
781         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
782         {
783                 EVRSSQ_syslog(LOG_DEBUG,
784                               "rssclient: [%ld] %s already in progress.",
785                               qrbuf->QRnumber,
786                               qrbuf->QRname);
787                 pthread_mutex_unlock(&RSSQueueMutex);
788                 return;
789         }
790         pthread_mutex_unlock(&RSSQueueMutex);
791
792         if (server_shutting_down) return;
793
794         while (RSSCfg != NULL)
795         {
796                 pthread_mutex_lock(&RSSQueueMutex);
797                 GetHash(RSSFetchUrls,
798                         SKEY(RSSCfg->Url),
799                         &vptr);
800
801                 use_this_RSSAggr = (rss_aggregator *)vptr;
802                 if (use_this_RSSAggr != NULL)
803                 {
804                         pRSSConfig *pRSSCfg;
805
806                         StrBufAppendBufPlain(
807                                 use_this_RSSAggr->rooms,
808                                 qrbuf->QRname,
809                                 -1, 0);
810                         if (use_this_RSSAggr->roomlist_parts==1)
811                         {
812                                 use_this_RSSAggr->OtherQRnumbers
813                                         = NewHash(1, lFlathash);
814                         }
815
816                         pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig));
817
818                         pRSSCfg->QRnumber = qrbuf->QRnumber;
819                         pRSSCfg->pCfg = RSSCfg;
820
821                         Put(use_this_RSSAggr->OtherQRnumbers,
822                             LKEY(qrbuf->QRnumber),
823                             pRSSCfg,
824                             NULL);
825                         use_this_RSSAggr->roomlist_parts++;
826
827                         pthread_mutex_unlock(&RSSQueueMutex);
828
829                         RSSCfg = RSSCfg->next;
830                         continue;
831                 }
832                 pthread_mutex_unlock(&RSSQueueMutex);
833
834                 RSSAggr = (rss_aggregator *) malloc(
835                         sizeof(rss_aggregator));
836
837                 memset (RSSAggr, 0, sizeof(rss_aggregator));
838                 RSSAggr->Cfg.QRnumber = qrbuf->QRnumber;
839                 RSSAggr->Cfg.pCfg = RSSCfg;
840                 RSSAggr->roomlist_parts = 1;
841                 RSSAggr->Url = NewStrBufDup(RSSCfg->Url);
842
843                 RSSAggr->ItemType = RSS_UNSET;
844
845                 RSSAggr->rooms = NewStrBufPlain(
846                         qrbuf->QRname, -1);
847
848                 pthread_mutex_lock(&RSSQueueMutex);
849
850                 Put(RSSFetchUrls,
851                     SKEY(RSSAggr->Url),
852                     RSSAggr,
853                     DeleteRssCfg);
854
855                 pthread_mutex_unlock(&RSSQueueMutex);
856                 RSSCfg = RSSCfg->next;
857         }
858 }
859
860 /*
861  * Scan for rooms that have RSS client requests configured
862  */
863 void rssclient_scan(void) {
864         int RSSRoomCount, RSSCount;
865         rss_aggregator *rptr = NULL;
866         void *vrptr = NULL;
867         HashPos *it;
868         long len;
869         const char *Key;
870         time_t now = time(NULL);
871
872         /* Run no more than once every 15 minutes. */
873         if ((now - last_run) < 900) {
874                 EVRSSQ_syslog(LOG_DEBUG,
875                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
876                               ((now - last_run) / 60),
877                               ((now - last_run) % 60)
878                 );
879                 return;
880         }
881
882         /*
883          * This is a simple concurrency check to make sure only one rssclient
884          * run is done at a time.
885          */
886         pthread_mutex_lock(&RSSQueueMutex);
887         RSSCount = GetCount(RSSFetchUrls);
888         RSSRoomCount = GetCount(RSSQueueRooms);
889         pthread_mutex_unlock(&RSSQueueMutex);
890
891         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
892                 EVRSSQ_syslog(LOG_DEBUG,
893                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
894                               RSSRoomCount, RSSCount
895                         );
896                 abort();
897                 return;
898         }
899
900         become_session(&rss_CC);
901         EVRSSQM_syslog(LOG_DEBUG, "rssclient started");
902         CtdlForEachNetCfgRoom(rssclient_scan_room, NULL);
903
904         if (GetCount(RSSFetchUrls) > 0)
905         {
906                 pthread_mutex_lock(&RSSQueueMutex);
907                 EVRSSQ_syslog(LOG_DEBUG,
908                                "rssclient starting %d Clients",
909                                GetCount(RSSFetchUrls));
910                 
911                 it = GetNewHashPos(RSSFetchUrls, 0);
912                 while (!server_shutting_down &&
913                        GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
914                        (vrptr != NULL)) {
915                         rptr = (rss_aggregator *)vrptr;
916                         if (!rss_do_fetching(rptr))
917                                 UnlinkRSSAggregator(rptr);
918                 }
919                 DeleteHashPos(&it);
920                 pthread_mutex_unlock(&RSSQueueMutex);
921         }
922         else
923                 EVRSSQM_syslog(LOG_DEBUG, "Nothing to do.");
924
925         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended");
926         return;
927 }
928
929 void rss_cleanup(void)
930 {
931         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
932         DeleteHash(&RSSFetchUrls);
933         DeleteHash(&RSSQueueRooms);
934 }
935
936 void LogDebugEnableRSSClient(const int n)
937 {
938         RSSClientDebugEnabled = n;
939 }
940
941
942 typedef struct __RSSVetoInfo {
943         StrBuf *ErrMsg;
944         time_t Now;
945         int Veto;
946 }RSSVetoInfo;
947
948 void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
949 {
950         RSSVetoInfo *Info = (RSSVetoInfo *) data;
951         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
952
953         while (RSSCfg != NULL)
954         {
955                 if ((RSSCfg->last_known_good != 0) &&
956                     (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now))
957                 {
958                         StrBufAppendPrintf(Info->ErrMsg,
959                                            "RSS feed not seen for a %d days:: <",
960                                            (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60));
961
962                         StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0);
963                         StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0);
964                 }
965                 RSSCfg = RSSCfg->next;
966         }
967 }
968
969 int RSSCheckUsetableVeto(StrBuf *ErrMsg)
970 {
971         RSSVetoInfo Info;
972
973         Info.ErrMsg = ErrMsg;
974         Info.Now = time (NULL);
975         Info.Veto = 0;
976
977         CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info);
978
979         return Info.Veto;;
980 }
981
982
983
984
985 void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG)
986 {
987         RSSCfgLine *RSSCfg;
988
989         RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine));
990         RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line));
991         
992
993         StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|');
994         RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|');
995
996
997         RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C];
998         OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg;
999 }
1000
1001 void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data)
1002 {
1003         RSSCfgLine *RSSCfg = (RSSCfgLine*) data;
1004
1005         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
1006         StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0);
1007         StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0);
1008         StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good);
1009 }
1010
1011 void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data)
1012 {
1013         RSSCfgLine *RSSCfg = (RSSCfgLine*) *data;
1014
1015         FreeStrBuf(&RSSCfg->Url);
1016         free(*data);
1017         *data = NULL;
1018 }
1019
1020
1021 CTDL_MODULE_INIT(rssclient)
1022 {
1023         if (!threading)
1024         {
1025                 CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0);
1026
1027                 CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine);
1028                 pthread_mutex_init(&RSSQueueMutex, NULL);
1029                 RSSQueueRooms = NewHash(1, lFlathash);
1030                 RSSFetchUrls = NewHash(1, NULL);
1031                 syslog(LOG_INFO, "%s\n", curl_version());
1032                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
1033                 CtdlRegisterEVCleanupHook(rss_cleanup);
1034                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
1035         }
1036         else
1037         {
1038                 CtdlFillSystemContext(&rss_CC, "rssclient");
1039         }
1040         return "rssclient";
1041 }