acae637871a5830a0b3aac56ead9ab2662b36738
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2016 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                               \
77         DBGLOG(LEVEL) syslog(LEVEL,                                     \
78                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
79                              IOSTR, IO->ID, CCID, N, __VA_ARGS__)
80
81 #define EVRSSCM_syslog(LEVEL, FORMAT)                                   \
82         DBGLOG(LEVEL) syslog(LEVEL,                                     \
83                              "%s[%ld]CC[%d][%ld]RSS" FORMAT,            \
84                              IOSTR, IO->ID, CCID, N)
85
86 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                               \
87         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,                       \
88                              __VA_ARGS__)
89 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
90         DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
91
92 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                  \
93         DBGLOG(LEVEL) syslog(LEVEL, "%s[%ld][%ld]RSS" FORMAT,           \
94                              IOSTR, IO->ID, N)
95
96 typedef enum _RSSState {
97         eRSSCreated,
98         eRSSFetching,
99         eRSSFailure,
100         eRSSParsing,
101         eRSSUT
102 } RSSState;
103 ConstStr RSSStates[] = {
104         {HKEY("Aggregator created")},
105         {HKEY("Fetching content")},
106         {HKEY("Failed")},
107         {HKEY("parsing content")},
108         {HKEY("checking usetable")}
109 };
110
111
112 static size_t GetLocationString( void *ptr, size_t size, size_t nmemb, void *userdata)
113 {
114 #define LOCATION "location"
115         if (strncasecmp((char*)ptr, LOCATION, sizeof(LOCATION) - 1) == 0)
116         {
117                 AsyncIO *IO = (AsyncIO *) userdata;
118                 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
119
120                 char *pch = (char*) ptr;
121                 char *pche;
122                 
123                 pche = pch + (size * nmemb);
124                 pch += sizeof(LOCATION);
125                 
126                 while (isspace(*pch) || (*pch == ':'))
127                         pch ++;
128
129                 while (isspace(*pche) || (*pche == '\0'))
130                         pche--;
131                 if (RSSAggr->RedirectUrl == NULL) {
132                         RSSAggr->RedirectUrl = NewStrBufPlain(pch, pche - pch + 1);
133                 }
134                 else {
135                         FlushStrBuf(RSSAggr->RedirectUrl);
136                         StrBufPlain(RSSAggr->RedirectUrl, pch, pche - pch + 1); 
137                 }
138         }
139         return size * nmemb;
140 }
141
142 static void SetRSSState(AsyncIO *IO, RSSState State)
143 {
144         CitContext* CCC = IO->CitContext;
145         if (CCC != NULL)
146                 memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
147 }
148
149 void DeleteRoomReference(long QRnumber)
150 {
151         HashPos *At;
152         long HKLen;
153         const char *HK;
154         void *vData = NULL;
155         rss_room_counter *pRoomC;
156
157         At = GetNewHashPos(RSSQueueRooms, 0);
158
159         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
160         {
161                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
162                 if (vData != NULL)
163                 {
164                         pRoomC = (rss_room_counter *) vData;
165                         pRoomC->count --;
166                         if (pRoomC->count == 0)
167                                 DeleteEntryFromHash(RSSQueueRooms, At);
168                 }
169         }
170         DeleteHashPos(&At);
171 }
172
173 void UnlinkRooms(rss_aggregator *RSSAggr)
174 {
175         DeleteRoomReference(RSSAggr->Cfg.QRnumber);
176         if (RSSAggr->OtherQRnumbers != NULL)
177         {
178                 long HKLen;
179                 const char *HK;
180                 HashPos *At;
181                 void *vData;
182
183                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
184                 while (! server_shutting_down &&
185                        GetNextHashPos(RSSAggr->OtherQRnumbers,
186                                       At,
187                                       &HKLen, &HK,
188                                       &vData) &&
189                        (vData != NULL))
190                 {
191                         pRSSConfig *Data = (pRSSConfig*) vData;
192                         DeleteRoomReference(Data->QRnumber);
193                 }
194
195                 DeleteHashPos(&At);
196         }
197 }
198
199 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
200 {
201         HashPos *At;
202
203         pthread_mutex_lock(&RSSQueueMutex);
204         UnlinkRooms(RSSAggr);
205
206         At = GetNewHashPos(RSSFetchUrls, 0);
207         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
208         {
209                 DeleteEntryFromHash(RSSFetchUrls, At);
210         }
211         DeleteHashPos(&At);
212         last_run = time(NULL);
213         pthread_mutex_unlock(&RSSQueueMutex);
214 }
215
216 void DeleteRssCfg(void *vptr)
217 {
218         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
219         AsyncIO *IO = &RSSAggr->IO;
220
221         if (IO->CitContext != NULL)
222                 EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
223
224         FreeStrBuf(&RSSAggr->Url);
225         FreeStrBuf(&RSSAggr->RedirectUrl);
226         FreeStrBuf(&RSSAggr->rooms);
227         FreeStrBuf(&RSSAggr->CData);
228         FreeStrBuf(&RSSAggr->Key);
229         DeleteHash(&RSSAggr->OtherQRnumbers);
230
231         DeleteHashPos (&RSSAggr->Pos);
232         DeleteHash (&RSSAggr->Messages);
233         if (RSSAggr->recp.recp_room != NULL)
234                 free(RSSAggr->recp.recp_room);
235
236
237         if (RSSAggr->Item != NULL)
238         {
239                 flush_rss_item(RSSAggr->Item);
240
241                 free(RSSAggr->Item);
242         }
243
244         FreeAsyncIOContents(&RSSAggr->IO);
245         memset(RSSAggr, 0, sizeof(rss_aggregator));
246         free(RSSAggr);
247 }
248
249 eNextState RSSAggregator_Terminate(AsyncIO *IO)
250 {
251         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
252
253         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
254
255         StopCurlWatchers(IO);
256         UnlinkRSSAggregator(RSSAggr);
257         return eAbort;
258 }
259
260 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
261 {
262         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
263
264         EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
265
266
267         StopDBWatchers(&RSSAggr->IO);
268         UnlinkRSSAggregator(RSSAggr);
269         return eAbort;
270 }
271
272 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
273 {
274         const char *pUrl;
275         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
276
277         pUrl = IO->ConnectMe->PlainUrl;
278         if (pUrl == NULL)
279                 pUrl = "";
280
281         EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
282
283         StopCurlWatchers(IO);
284         UnlinkRSSAggregator(RSSAggr);
285         return eAbort;
286 }
287
288 void AppendLink(StrBuf *Message,
289                 StrBuf *link,
290                 StrBuf *LinkTitle,
291                 const char *Title)
292 {
293         if (StrLength(link) > 0)
294         {
295                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
296                 StrBufAppendBuf(Message, link, 0);
297                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
298                 if (StrLength(LinkTitle) > 0)
299                         StrBufAppendBuf(Message, LinkTitle, 0);
300                 else if ((Title != NULL) && !IsEmptyStr(Title))
301                         StrBufAppendBufPlain(Message, Title, -1, 0);
302                 else
303                         StrBufAppendBuf(Message, link, 0);
304                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
305         }
306 }
307
308
309 int rss_format_item(AsyncIO *IO, networker_save_message *SaveMsg)
310 {
311         StrBuf *Message;
312         int msglen = 0;
313
314         if (StrLength(SaveMsg->description) + 
315             StrLength(SaveMsg->link) + 
316             StrLength(SaveMsg->linkTitle) + 
317             StrLength(SaveMsg->reLink) +
318             StrLength(SaveMsg->reLinkTitle) +
319             StrLength(SaveMsg->title) == 0)
320         {
321                 EVRSSCM_syslog(LOG_INFO, "Refusing to save empty message.");
322                 return 0;
323         }
324
325         CM_Flush(&SaveMsg->Msg);
326
327         if (SaveMsg->author_or_creator != NULL) {
328
329                 char *From;
330                 StrBuf *Encoded = NULL;
331                 int FromAt;
332
333                 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
334                                      StrLength(SaveMsg->author_or_creator),
335                                      512, 0);
336                 StrBufPlain(SaveMsg->author_or_creator, From, -1);
337                 StrBufTrim(SaveMsg->author_or_creator);
338                 free(From);
339
340                 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
341                 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
342                 {
343                         StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
344                         CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
345                         CM_SetAsFieldSB(&SaveMsg->Msg, eMessagePath, &SaveMsg->author_email);
346                 }
347                 else
348                 {
349                         if (FromAt)
350                         {
351                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &SaveMsg->author_or_creator);
352                                 CM_CopyField(&SaveMsg->Msg, eMessagePath, eAuthor);
353                         }
354                         else
355                         {
356                                 StrBufRFC2047encode(&Encoded,
357                                                     SaveMsg->author_or_creator);
358                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
359                                 CM_SetField(&SaveMsg->Msg, eMessagePath, HKEY("rss@localhost"));
360
361                         }
362                 }
363         }
364         else {
365                 CM_SetField(&SaveMsg->Msg, eAuthor, HKEY("rss"));
366         }
367
368         CM_SetField(&SaveMsg->Msg, eNodeName, CtdlGetConfigStr("c_nodename"), strlen(CtdlGetConfigStr("c_nodename")));
369         if (SaveMsg->title != NULL) {
370                 long len;
371                 char *Sbj;
372                 StrBuf *Encoded, *QPEncoded;
373
374                 QPEncoded = NULL;
375                 StrBufSpaceToBlank(SaveMsg->title);
376                 len = StrLength(SaveMsg->title);
377                 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
378                 if (!IsEmptyStr(Sbj)) {
379                         len = strlen(Sbj);
380                         if ((Sbj[len - 1] == '\n'))
381                         {
382                                 len --;
383                                 Sbj[len] = '\0';
384                         }
385                         Encoded = NewStrBufPlain(Sbj, len);
386                 
387
388                         StrBufTrim(Encoded);
389                         StrBufRFC2047encode(&QPEncoded, Encoded);
390                         
391                         CM_SetAsFieldSB(&SaveMsg->Msg, eMsgSubject, &QPEncoded);
392                         FreeStrBuf(&Encoded);
393                 }
394                 if (Sbj != NULL) {
395                         free(Sbj);
396                 }
397         }
398         if (SaveMsg->link == NULL)
399                 SaveMsg->link = NewStrBufPlain(HKEY(""));
400
401 #if 0 /* temporarily disable shorter urls. */
402         SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
403                 GetShorterUrls(SaveMsg->description);
404 #endif
405
406         msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
407
408         Message = NewStrBufPlain(NULL, msglen);
409
410         StrBufPlain(Message, HKEY(
411                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
412                             "<html><body>\n"));
413 #if 0 /* disable shorter url for now. */
414         SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
415 #endif
416         StrBufAppendBuf(Message, SaveMsg->description, 0);
417         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
418
419         AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
420         AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
421         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
422
423         SaveMsg->Message = Message;
424         return 1;
425 }
426
427 eNextState RSSSaveMessage(AsyncIO *IO)
428 {
429         long len;
430         const char *Key;
431         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
432
433         if (rss_format_item(IO, RSSAggr->ThisMsg))
434         {
435                 CM_SetAsFieldSB(&RSSAggr->ThisMsg->Msg, eMesageText,
436                                        &RSSAggr->ThisMsg->Message);
437
438                 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
439                 
440                 /* write the uidl to the use table so we don't store this item again */
441                 
442                 CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, EvGetNow(IO), 0, eWrite, CCID, IO->ID);
443         }
444
445         if (GetNextHashPos(RSSAggr->Messages,
446                            RSSAggr->Pos,
447                            &len, &Key,
448                            (void**) &RSSAggr->ThisMsg))
449                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
450         else
451                 return eAbort;
452 }
453
454 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
455 {
456         static const time_t antiExpire = USETABLE_ANTIEXPIRE_HIRES;
457 #ifndef DEBUG_RSS
458         time_t seenstamp = 0;
459         const char *Key;
460         long len;
461         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
462
463         /* Find out if we've already seen this item */
464 // todo: expiry?
465         SetRSSState(IO, eRSSUT);
466         seenstamp = CheckIfAlreadySeen("RSS Item Seen",
467                                        Ctx->ThisMsg->MsgGUID,
468                                        EvGetNow(IO),
469                                        antiExpire,
470                                        eCheckUpdate,
471                                        CCID, IO->ID);
472         if (seenstamp != 0)
473         {
474                 /* Item has already been seen */
475                 EVRSSC_syslog(LOG_DEBUG,
476                               "%s has already been seen - %ld < %ld",
477                               ChrPtr(Ctx->ThisMsg->MsgGUID),
478                               seenstamp, antiExpire);
479
480                 SetRSSState(IO, eRSSParsing);
481
482                 if (GetNextHashPos(Ctx->Messages,
483                                    Ctx->Pos,
484                                    &len, &Key,
485                                    (void**) &Ctx->ThisMsg))
486                         return NextDBOperation(
487                                 IO,
488                                 RSS_FetchNetworkUsetableEntry);
489                 else
490                         return eAbort;
491         }
492         else
493 #endif
494         {
495                 /* Item has already been seen */
496                 EVRSSC_syslog(LOG_DEBUG,
497                               "%s Parsing - %ld >= %ld",
498                               ChrPtr(Ctx->ThisMsg->MsgGUID),
499                               seenstamp, antiExpire);
500                 SetRSSState(IO, eRSSParsing);
501
502                 NextDBOperation(IO, RSSSaveMessage);
503                 return eSendMore;
504         }
505         return eSendMore;
506 }
507
508 void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now)
509 {
510         OneRoomNetCfg *pRNCfg;
511         begin_critical_section(S_NETCONFIGS);
512         pRNCfg = CtdlGetNetCfgForRoom(pCfg->QRnumber);
513         if (pRNCfg != NULL)
514         {
515                 RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient];
516
517                 while (RSSCfg != NULL)
518                 {
519                         if (RSSCfg == pCfg->pCfg)
520                                 break;
521
522                         RSSCfg = RSSCfg->next;
523                 }
524                 if (RSSCfg != NULL)
525                 {
526                         pRNCfg->changed = 1;
527                         RSSCfg->last_known_good = now;
528                 }
529         }
530
531         SaveRoomNetConfigFile(pRNCfg, pCfg->QRnumber);
532         FreeRoomNetworkStruct(&pRNCfg);
533         end_critical_section(S_NETCONFIGS);
534 }
535
536 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
537 {
538         HashPos *it = NULL;
539         long len;
540         const char *Key;
541         pRSSConfig *pCfg;
542         u_char rawdigest[MD5_DIGEST_LEN];
543         struct MD5Context md5context;
544         StrBuf *guid;
545         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
546
547
548         if ((IO->HttpReq.httpcode >= 300) &&
549             (IO->HttpReq.httpcode < 400)  && 
550             (Ctx->RedirectUrl != NULL)) {
551
552                 StrBuf *ErrMsg;
553                 long lens[2];
554                 const char *strs[2];
555
556                 SetRSSState(IO, eRSSFailure);
557                 ErrMsg = NewStrBuf();
558                 if (IO) EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
559                               IO->HttpReq.httpcode);
560                 strs[0] = ChrPtr(Ctx->Url);
561                 lens[0] = StrLength(Ctx->Url);
562
563                 strs[1] = ChrPtr(Ctx->rooms);
564                 lens[1] = StrLength(Ctx->rooms);
565
566                 if (IO->HttpReq.CurlError == NULL)
567                         IO->HttpReq.CurlError = "";
568
569                 StrBufPrintf(ErrMsg,
570                              "Error while RSS-Aggregation Run of %s\n"
571                              " need a 200, got a %ld !\n"
572                              " Curl Error message: \n%s / %s\n"
573                              " Redirect header points to: %s\n"
574                              " Response text was: \n"
575                              " \n %s\n",
576                              ChrPtr(Ctx->Url),
577                              IO->HttpReq.httpcode,
578                              IO->HttpReq.errdesc,
579                              IO->HttpReq.CurlError,
580                              ChrPtr(Ctx->RedirectUrl),
581                              ChrPtr(IO->HttpReq.ReplyData)
582                         );
583
584                 CtdlAideFPMessage(
585                         ChrPtr(ErrMsg),
586                         "RSS Aggregation run failure",
587                         2, strs, (long*) &lens,
588                         CCID,
589                         IO->ID,
590                         EvGetNow(IO));
591                 
592                 FreeStrBuf(&ErrMsg);
593                 EVRSSC_syslog(LOG_DEBUG,
594                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>\n",
595                               ChrPtr(Ctx->Url),
596                               IO->HttpReq.httpcode);
597                 return eAbort;
598         }
599         else if (IO->HttpReq.httpcode != 200)
600         {
601                 StrBuf *ErrMsg;
602                 long lens[2];
603                 const char *strs[2];
604
605                 SetRSSState(IO, eRSSFailure);
606                 ErrMsg = NewStrBuf();
607                 if (IO) EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
608                               IO->HttpReq.httpcode);
609                 strs[0] = ChrPtr(Ctx->Url);
610                 lens[0] = StrLength(Ctx->Url);
611
612                 strs[1] = ChrPtr(Ctx->rooms);
613                 lens[1] = StrLength(Ctx->rooms);
614
615                 if (IO->HttpReq.CurlError == NULL)
616                         IO->HttpReq.CurlError = "";
617
618                 StrBufPrintf(ErrMsg,
619                              "Error while RSS-Aggregation Run of %s\n"
620                              " need a 200, got a %ld !\n"
621                              " Curl Error message: \n%s / %s\n"
622                              " Response text was: \n"
623                              " \n %s\n",
624                              ChrPtr(Ctx->Url),
625                              IO->HttpReq.httpcode,
626                              IO->HttpReq.errdesc,
627                              IO->HttpReq.CurlError,
628                              ChrPtr(IO->HttpReq.ReplyData)
629                         );
630
631                 CtdlAideFPMessage(
632                         ChrPtr(ErrMsg),
633                         "RSS Aggregation run failure",
634                         2, strs, (long*) &lens,
635                         CCID,
636                         IO->ID,
637                         EvGetNow(IO));
638                 
639                 FreeStrBuf(&ErrMsg);
640                 EVRSSC_syslog(LOG_DEBUG,
641                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>\n",
642                               ChrPtr(Ctx->Url),
643                               IO->HttpReq.httpcode);
644                 return eAbort;
645         }
646
647         pCfg = &Ctx->Cfg;
648
649         while (pCfg != NULL)
650         {
651                 UpdateLastKnownGood (pCfg, EvGetNow(IO));
652                 if ((Ctx->roomlist_parts > 1) && 
653                     (it == NULL))
654                 {
655                         it = GetNewHashPos(RSSFetchUrls, 0);
656                 }
657                 if (it != NULL)
658                 {
659                         void *vptr;
660                         if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr))
661                                 pCfg = vptr;
662                         else
663                                 pCfg = NULL;
664                 }
665                 else 
666                         pCfg = NULL;
667         }
668         DeleteHashPos (&it);
669
670         SetRSSState(IO, eRSSUT);
671
672         MD5Init(&md5context);
673
674         MD5Update(&md5context,
675                   (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
676
677         MD5Update(&md5context,
678                   (const unsigned char*)SKEY(Ctx->Url));
679
680         MD5Final(rawdigest, &md5context);
681         guid = NewStrBufPlain(NULL,
682                               MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
683         StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
684         StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
685         if (StrLength(guid) > 40)
686                 StrBufCutAt(guid, 40, NULL);
687         /* Find out if we've already seen this item */
688
689 #ifndef DEBUG_RSS
690
691         if (CheckIfAlreadySeen("RSS Whole",
692                                guid,
693                                EvGetNow(IO),
694                                EvGetNow(IO) - USETABLE_ANTIEXPIRE,
695                                eUpdate,
696                                CCID, IO->ID)
697             != 0)
698         {
699                 FreeStrBuf(&guid);
700
701                 EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url));
702                 return eAbort;
703         }
704         FreeStrBuf(&guid);
705 #endif
706         SetRSSState(IO, eRSSParsing);
707         return RSSAggregator_ParseReply(IO);
708 }
709
710 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
711 {
712         return CurlQueueDBOperation(IO, RSSAggregator_AnalyseReply);
713 }
714
715 /*
716  * Begin a feed parse
717  */
718 int rss_do_fetching(rss_aggregator *RSSAggr)
719 {
720         AsyncIO         *IO = &RSSAggr->IO;
721         rss_item *ri;
722         time_t now;
723         CURLcode sta;
724         CURL *chnd;
725
726
727         now = time(NULL);
728
729         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
730                 return 0;
731
732         ri = (rss_item*) malloc(sizeof(rss_item));
733         memset(ri, 0, sizeof(rss_item));
734         RSSAggr->Item = ri;
735
736         if (! InitcURLIOStruct(&RSSAggr->IO,
737                                RSSAggr,
738                                "Citadel RSS Client",
739                                RSSAggregator_FinishHttp,
740                                RSSAggregator_Terminate,
741                                RSSAggregator_TerminateDB,
742                                RSSAggregator_ShutdownAbort))
743         {
744                 EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
745                 return 0;
746         }
747         chnd = IO->HttpReq.chnd;
748         OPT(HEADERDATA, IO);
749         OPT(HEADERFUNCTION, GetLocationString);
750         SetRSSState(IO, eRSSCreated);
751
752         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
753                     ChrPtr(RSSAggr->Url),
754                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
755
756         EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
757         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
758         CurlPrepareURL(RSSAggr->IO.ConnectMe);
759
760         SetRSSState(IO, eRSSFetching);
761         QueueCurlContext(&RSSAggr->IO);
762         return 1;
763 }
764
765 /*
766  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
767  */
768 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
769 {
770         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
771         rss_aggregator *RSSAggr = NULL;
772         rss_aggregator *use_this_RSSAggr = NULL;
773         void *vptr;
774
775         TRACE;
776
777         pthread_mutex_lock(&RSSQueueMutex);
778         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
779         {
780                 EVRSSQ_syslog(LOG_DEBUG,
781                               "rssclient: [%ld] %s already in progress.\n",
782                               qrbuf->QRnumber,
783                               qrbuf->QRname);
784                 pthread_mutex_unlock(&RSSQueueMutex);
785                 return;
786         }
787         pthread_mutex_unlock(&RSSQueueMutex);
788
789         if (server_shutting_down) return;
790
791         while (RSSCfg != NULL)
792         {
793                 pthread_mutex_lock(&RSSQueueMutex);
794                 GetHash(RSSFetchUrls,
795                         SKEY(RSSCfg->Url),
796                         &vptr);
797
798                 use_this_RSSAggr = (rss_aggregator *)vptr;
799                 if (use_this_RSSAggr != NULL)
800                 {
801                         pRSSConfig *pRSSCfg;
802
803                         StrBufAppendBufPlain(
804                                 use_this_RSSAggr->rooms,
805                                 qrbuf->QRname,
806                                 -1, 0);
807                         if (use_this_RSSAggr->roomlist_parts==1)
808                         {
809                                 use_this_RSSAggr->OtherQRnumbers
810                                         = NewHash(1, lFlathash);
811                         }
812
813                         pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig));
814
815                         pRSSCfg->QRnumber = qrbuf->QRnumber;
816                         pRSSCfg->pCfg = RSSCfg;
817
818                         Put(use_this_RSSAggr->OtherQRnumbers,
819                             LKEY(qrbuf->QRnumber),
820                             pRSSCfg,
821                             NULL);
822                         use_this_RSSAggr->roomlist_parts++;
823
824                         pthread_mutex_unlock(&RSSQueueMutex);
825
826                         RSSCfg = RSSCfg->next;
827                         continue;
828                 }
829                 pthread_mutex_unlock(&RSSQueueMutex);
830
831                 RSSAggr = (rss_aggregator *) malloc(
832                         sizeof(rss_aggregator));
833
834                 memset (RSSAggr, 0, sizeof(rss_aggregator));
835                 RSSAggr->Cfg.QRnumber = qrbuf->QRnumber;
836                 RSSAggr->Cfg.pCfg = RSSCfg;
837                 RSSAggr->roomlist_parts = 1;
838                 RSSAggr->Url = NewStrBufDup(RSSCfg->Url);
839
840                 RSSAggr->ItemType = RSS_UNSET;
841
842                 RSSAggr->rooms = NewStrBufPlain(
843                         qrbuf->QRname, -1);
844
845                 pthread_mutex_lock(&RSSQueueMutex);
846
847                 Put(RSSFetchUrls,
848                     SKEY(RSSAggr->Url),
849                     RSSAggr,
850                     DeleteRssCfg);
851
852                 pthread_mutex_unlock(&RSSQueueMutex);
853                 RSSCfg = RSSCfg->next;
854         }
855 }
856
857 /*
858  * Scan for rooms that have RSS client requests configured
859  */
860 void rssclient_scan(void) {
861         int RSSRoomCount, RSSCount;
862         rss_aggregator *rptr = NULL;
863         void *vrptr = NULL;
864         HashPos *it;
865         long len;
866         const char *Key;
867         time_t now = time(NULL);
868
869         /* Run no more than once every 15 minutes. */
870         if ((now - last_run) < 900) {
871                 EVRSSQ_syslog(LOG_DEBUG,
872                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
873                               ((now - last_run) / 60),
874                               ((now - last_run) % 60)
875                 );
876                 return;
877         }
878
879         /*
880          * This is a simple concurrency check to make sure only one rssclient
881          * run is done at a time.
882          */
883         pthread_mutex_lock(&RSSQueueMutex);
884         RSSCount = GetCount(RSSFetchUrls);
885         RSSRoomCount = GetCount(RSSQueueRooms);
886         pthread_mutex_unlock(&RSSQueueMutex);
887
888         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
889                 EVRSSQ_syslog(LOG_DEBUG,
890                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
891                               RSSRoomCount, RSSCount
892                         );
893                 return;
894         }
895
896         become_session(&rss_CC);
897         EVRSSQM_syslog(LOG_DEBUG, "rssclient started");
898         CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
899
900         if (GetCount(RSSFetchUrls) > 0)
901         {
902                 pthread_mutex_lock(&RSSQueueMutex);
903                 EVRSSQ_syslog(LOG_DEBUG,
904                                "rssclient starting %d Clients",
905                                GetCount(RSSFetchUrls));
906                 
907                 it = GetNewHashPos(RSSFetchUrls, 0);
908                 while (!server_shutting_down &&
909                        GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
910                        (vrptr != NULL)) {
911                         rptr = (rss_aggregator *)vrptr;
912                         if (!rss_do_fetching(rptr))
913                                 UnlinkRSSAggregator(rptr);
914                 }
915                 DeleteHashPos(&it);
916                 pthread_mutex_unlock(&RSSQueueMutex);
917         }
918         else
919                 EVRSSQM_syslog(LOG_DEBUG, "Nothing to do.");
920
921         EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
922         return;
923 }
924
925 void rss_cleanup(void)
926 {
927         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
928         DeleteHash(&RSSFetchUrls);
929         DeleteHash(&RSSQueueRooms);
930 }
931
932 void LogDebugEnableRSSClient(const int n)
933 {
934         RSSClientDebugEnabled = n;
935 }
936
937
938 typedef struct __RSSVetoInfo {
939         StrBuf *ErrMsg;
940         time_t Now;
941         int Veto;
942 }RSSVetoInfo;
943
944 void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
945 {
946         RSSVetoInfo *Info = (RSSVetoInfo *) data;
947         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
948
949         while (RSSCfg != NULL)
950         {
951                 if ((RSSCfg->last_known_good != 0) &&
952                     (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now))
953                 {
954                         StrBufAppendPrintf(Info->ErrMsg,
955                                            "RSS feed not seen for a %d days:: <",
956                                            (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60));
957
958                         StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0);
959                         StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0);
960                 }
961                 RSSCfg = RSSCfg->next;
962         }
963 }
964
965 int RSSCheckUsetableVeto(StrBuf *ErrMsg)
966 {
967         RSSVetoInfo Info;
968
969         Info.ErrMsg = ErrMsg;
970         Info.Now = time (NULL);
971         Info.Veto = 0;
972
973         CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient);
974
975         return Info.Veto;;
976 }
977
978
979
980
981 void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG)
982 {
983         RSSCfgLine *RSSCfg;
984
985         RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine));
986         RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line));
987         
988
989         StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|');
990         RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|');
991
992
993         RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C];
994         OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg;
995 }
996
997 void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data)
998 {
999         RSSCfgLine *RSSCfg = (RSSCfgLine*) data;
1000
1001         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
1002         StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0);
1003         StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0);
1004         StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good);
1005 }
1006
1007 void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data)
1008 {
1009         RSSCfgLine *RSSCfg = (RSSCfgLine*) *data;
1010
1011         FreeStrBuf(&RSSCfg->Url);
1012         free(*data);
1013         *data = NULL;
1014 }
1015
1016
1017 CTDL_MODULE_INIT(rssclient)
1018 {
1019         if (!threading)
1020         {
1021                 CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0);
1022
1023                 CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine);
1024                 pthread_mutex_init(&RSSQueueMutex, NULL);
1025                 RSSQueueRooms = NewHash(1, lFlathash);
1026                 RSSFetchUrls = NewHash(1, NULL);
1027                 syslog(LOG_INFO, "%s\n", curl_version());
1028                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
1029                 CtdlRegisterEVCleanupHook(rss_cleanup);
1030                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
1031         }
1032         else
1033         {
1034                 CtdlFillSystemContext(&rss_CC, "rssclient");
1035         }
1036         return "rssclient";
1037 }