Learned that LOG_ALERT is not really appropriate for the kind of things
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2016 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12  * GNU General Public License for more details.
13  */
14
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <stdio.h>
18
19 #if TIME_WITH_SYS_TIME
20 # include <sys/time.h>
21 # include <time.h>
22 #else
23 # if HAVE_SYS_TIME_H
24 #include <sys/time.h>
25 # else
26 #include <time.h>
27 # endif
28 #endif
29
30 #include <ctype.h>
31 #include <string.h>
32 #include <errno.h>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <expat.h>
36 #include <curl/curl.h>
37 #include <libcitadel.h>
38 #include "citadel.h"
39 #include "server.h"
40 #include "citserver.h"
41 #include "support.h"
42 #include "config.h"
43 #include "threads.h"
44 #include "ctdl_module.h"
45 #include "msgbase.h"
46 #include "parsedate.h"
47 #include "database.h"
48 #include "citadel_dirs.h"
49 #include "md5.h"
50 #include "context.h"
51 #include "event_client.h"
52 #include "rss_atom_parser.h"
53
54
55 #define TMP_MSGDATA 0xFF
56 #define TMP_SHORTER_URL_OFFSET 0xFE
57 #define TMP_SHORTER_URLS 0xFD
58
59 time_t last_run = 0L;
60
61 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
62 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
63 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
64
65 eNextState RSSAggregator_Terminate(AsyncIO *IO);
66 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
67 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
68 struct CitContext rss_CC;
69
70 struct rssnetcfg *rnclist = NULL;
71 int RSSClientDebugEnabled = 0;
72 #define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber
73
74 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
75
76 typedef enum _RSSState {
77         eRSSCreated,
78         eRSSFetching,
79         eRSSFailure,
80         eRSSParsing,
81         eRSSUT
82 } RSSState;
83 ConstStr RSSStates[] = {
84         {HKEY("Aggregator created")},
85         {HKEY("Fetching content")},
86         {HKEY("Failed")},
87         {HKEY("parsing content")},
88         {HKEY("checking usetable")}
89 };
90
91
92 static size_t GetLocationString( void *ptr, size_t size, size_t nmemb, void *userdata)
93 {
94 #define LOCATION "location"
95         if (strncasecmp((char*)ptr, LOCATION, sizeof(LOCATION) - 1) == 0)
96         {
97                 AsyncIO *IO = (AsyncIO *) userdata;
98                 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
99
100                 char *pch = (char*) ptr;
101                 char *pche;
102                 
103                 pche = pch + (size * nmemb);
104                 pch += sizeof(LOCATION);
105                 
106                 while (isspace(*pch) || (*pch == ':'))
107                         pch ++;
108
109                 while (isspace(*pche) || (*pche == '\0'))
110                         pche--;
111                 if (RSSAggr->RedirectUrl == NULL) {
112                         RSSAggr->RedirectUrl = NewStrBufPlain(pch, pche - pch + 1);
113                 }
114                 else {
115                         FlushStrBuf(RSSAggr->RedirectUrl);
116                         StrBufPlain(RSSAggr->RedirectUrl, pch, pche - pch + 1); 
117                 }
118         }
119         return size * nmemb;
120 }
121
122
123 static void SetRSSState(AsyncIO *IO, RSSState State)
124 {
125         CitContext* CCC = IO->CitContext;
126         if (CCC != NULL) {
127                 memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
128         }
129 }
130
131
132 void DeleteRoomReference(long QRnumber)
133 {
134         HashPos *At;
135         long HKLen;
136         const char *HK;
137         void *vData = NULL;
138         rss_room_counter *pRoomC;
139
140         At = GetNewHashPos(RSSQueueRooms, 0);
141
142         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
143         {
144                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
145                 if (vData != NULL)
146                 {
147                         pRoomC = (rss_room_counter *) vData;
148                         pRoomC->count --;
149                         if (pRoomC->count == 0)
150                                 DeleteEntryFromHash(RSSQueueRooms, At);
151                 }
152         }
153         DeleteHashPos(&At);
154 }
155
156 void UnlinkRooms(rss_aggregator *RSSAggr)
157 {
158         DeleteRoomReference(RSSAggr->Cfg.QRnumber);
159         if (RSSAggr->OtherQRnumbers != NULL)
160         {
161                 long HKLen;
162                 const char *HK;
163                 HashPos *At;
164                 void *vData;
165
166                 At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
167                 while (! server_shutting_down &&
168                        GetNextHashPos(RSSAggr->OtherQRnumbers,
169                                       At,
170                                       &HKLen, &HK,
171                                       &vData) &&
172                        (vData != NULL))
173                 {
174                         pRSSConfig *Data = (pRSSConfig*) vData;
175                         DeleteRoomReference(Data->QRnumber);
176                 }
177
178                 DeleteHashPos(&At);
179         }
180 }
181
182 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
183 {
184         HashPos *At;
185
186         pthread_mutex_lock(&RSSQueueMutex);
187         UnlinkRooms(RSSAggr);
188
189         At = GetNewHashPos(RSSFetchUrls, 0);
190         if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
191         {
192                 DeleteEntryFromHash(RSSFetchUrls, At);
193         }
194         DeleteHashPos(&At);
195         last_run = time(NULL);
196         pthread_mutex_unlock(&RSSQueueMutex);
197 }
198
199 void DeleteRssCfg(void *vptr)
200 {
201         rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
202         AsyncIO *IO = &RSSAggr->IO;
203
204         if (IO->CitContext != NULL) {
205                 syslog(LOG_DEBUG, "RSS: destroying\n");
206         }
207
208         FreeStrBuf(&RSSAggr->Url);
209         FreeStrBuf(&RSSAggr->RedirectUrl);
210         FreeStrBuf(&RSSAggr->rooms);
211         FreeStrBuf(&RSSAggr->CData);
212         FreeStrBuf(&RSSAggr->Key);
213         DeleteHash(&RSSAggr->OtherQRnumbers);
214
215         DeleteHashPos (&RSSAggr->Pos);
216         DeleteHash (&RSSAggr->Messages);
217         if (RSSAggr->recp.recp_room != NULL)
218                 free(RSSAggr->recp.recp_room);
219
220
221         if (RSSAggr->Item != NULL)
222         {
223                 flush_rss_item(RSSAggr->Item);
224
225                 free(RSSAggr->Item);
226         }
227
228         FreeAsyncIOContents(&RSSAggr->IO);
229         memset(RSSAggr, 0, sizeof(rss_aggregator));
230         free(RSSAggr);
231 }
232
233 eNextState RSSAggregator_Terminate(AsyncIO *IO)
234 {
235         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
236
237         syslog(LOG_DEBUG, "RSS: Terminating.");
238
239         StopCurlWatchers(IO);
240         UnlinkRSSAggregator(RSSAggr);
241         return eAbort;
242 }
243
244 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
245 {
246         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
247
248         syslog(LOG_DEBUG, "RSS: Terminating.");
249
250
251         StopDBWatchers(&RSSAggr->IO);
252         UnlinkRSSAggregator(RSSAggr);
253         return eAbort;
254 }
255
256 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
257 {
258         const char *pUrl;
259         rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
260
261         pUrl = IO->ConnectMe->PlainUrl;
262         if (pUrl == NULL)
263                 pUrl = "";
264
265         syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.", pUrl);
266
267         StopCurlWatchers(IO);
268         UnlinkRSSAggregator(RSSAggr);
269         return eAbort;
270 }
271
272 void AppendLink(StrBuf *Message,
273                 StrBuf *link,
274                 StrBuf *LinkTitle,
275                 const char *Title)
276 {
277         if (StrLength(link) > 0)
278         {
279                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
280                 StrBufAppendBuf(Message, link, 0);
281                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
282                 if (StrLength(LinkTitle) > 0)
283                         StrBufAppendBuf(Message, LinkTitle, 0);
284                 else if ((Title != NULL) && !IsEmptyStr(Title))
285                         StrBufAppendBufPlain(Message, Title, -1, 0);
286                 else
287                         StrBufAppendBuf(Message, link, 0);
288                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
289         }
290 }
291
292
293 int rss_format_item(AsyncIO *IO, networker_save_message *SaveMsg)
294 {
295         StrBuf *Message;
296         int msglen = 0;
297
298         if (StrLength(SaveMsg->description) + 
299             StrLength(SaveMsg->link) + 
300             StrLength(SaveMsg->linkTitle) + 
301             StrLength(SaveMsg->reLink) +
302             StrLength(SaveMsg->reLinkTitle) +
303             StrLength(SaveMsg->title) == 0)
304         {
305                 syslog(LOG_INFO, "Refusing to save empty message.");
306                 return 0;
307         }
308
309         CM_Flush(&SaveMsg->Msg);
310
311         if (SaveMsg->author_or_creator != NULL) {
312
313                 char *From;
314                 StrBuf *Encoded = NULL;
315                 int FromAt;
316
317                 From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator),
318                                      StrLength(SaveMsg->author_or_creator),
319                                      512, 0);
320                 StrBufPlain(SaveMsg->author_or_creator, From, -1);
321                 StrBufTrim(SaveMsg->author_or_creator);
322                 free(From);
323
324                 FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL;
325                 if (!FromAt && StrLength (SaveMsg->author_email) > 0)
326                 {
327                         StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator);
328                         CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
329                         CM_SetAsFieldSB(&SaveMsg->Msg, eMessagePath, &SaveMsg->author_email);
330                 }
331                 else
332                 {
333                         if (FromAt)
334                         {
335                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &SaveMsg->author_or_creator);
336                                 CM_CopyField(&SaveMsg->Msg, eMessagePath, eAuthor);
337                         }
338                         else
339                         {
340                                 StrBufRFC2047encode(&Encoded,
341                                                     SaveMsg->author_or_creator);
342                                 CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded);
343                                 CM_SetField(&SaveMsg->Msg, eMessagePath, HKEY("rss@localhost"));
344
345                         }
346                 }
347         }
348         else {
349                 CM_SetField(&SaveMsg->Msg, eAuthor, HKEY("rss"));
350         }
351
352         CM_SetField(&SaveMsg->Msg, eNodeName, CtdlGetConfigStr("c_nodename"), strlen(CtdlGetConfigStr("c_nodename")));
353         if (SaveMsg->title != NULL) {
354                 long len;
355                 char *Sbj;
356                 StrBuf *Encoded, *QPEncoded;
357
358                 QPEncoded = NULL;
359                 StrBufSpaceToBlank(SaveMsg->title);
360                 len = StrLength(SaveMsg->title);
361                 Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0);
362                 if (!IsEmptyStr(Sbj)) {
363                         len = strlen(Sbj);
364                         if ((Sbj[len - 1] == '\n'))
365                         {
366                                 len --;
367                                 Sbj[len] = '\0';
368                         }
369                         Encoded = NewStrBufPlain(Sbj, len);
370                 
371
372                         StrBufTrim(Encoded);
373                         StrBufRFC2047encode(&QPEncoded, Encoded);
374                         
375                         CM_SetAsFieldSB(&SaveMsg->Msg, eMsgSubject, &QPEncoded);
376                         FreeStrBuf(&Encoded);
377                 }
378                 if (Sbj != NULL) {
379                         free(Sbj);
380                 }
381         }
382         if (SaveMsg->link == NULL)
383                 SaveMsg->link = NewStrBufPlain(HKEY(""));
384
385 #if 0 /* temporarily disable shorter urls. */
386         SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] =
387                 GetShorterUrls(SaveMsg->description);
388 #endif
389
390         msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ;
391
392         Message = NewStrBufPlain(NULL, msglen);
393
394         StrBufPlain(Message, HKEY(
395                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
396                             "<html><body>\n"));
397 #if 0 /* disable shorter url for now. */
398         SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
399 #endif
400         StrBufAppendBuf(Message, SaveMsg->description, 0);
401         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
402
403         AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL);
404         AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this");
405         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
406
407         SaveMsg->Message = Message;
408         return 1;
409 }
410
411 eNextState RSSSaveMessage(AsyncIO *IO)
412 {
413         long len;
414         const char *Key;
415         rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
416
417         if (rss_format_item(IO, RSSAggr->ThisMsg))
418         {
419                 CM_SetAsFieldSB(&RSSAggr->ThisMsg->Msg, eMesageText,
420                                        &RSSAggr->ThisMsg->Message);
421
422                 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
423                 
424                 /* write the uidl to the use table so we don't store this item again */
425                 
426                 CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, EvGetNow(IO), 0, eWrite, CCID, IO->ID);
427         }
428
429         if (GetNextHashPos(RSSAggr->Messages,
430                            RSSAggr->Pos,
431                            &len, &Key,
432                            (void**) &RSSAggr->ThisMsg))
433                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
434         else
435                 return eAbort;
436 }
437
438 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
439 {
440         static const time_t antiExpire = USETABLE_ANTIEXPIRE_HIRES;
441 #ifndef DEBUG_RSS
442         time_t seenstamp = 0;
443         const char *Key;
444         long len;
445         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
446
447         /* Find out if we've already seen this item */
448 // todo: expiry?
449         SetRSSState(IO, eRSSUT);
450         seenstamp = CheckIfAlreadySeen("RSS Item Seen",
451                                        Ctx->ThisMsg->MsgGUID,
452                                        EvGetNow(IO),
453                                        antiExpire,
454                                        eCheckUpdate,
455                                        CCID, IO->ID);
456         if (seenstamp != 0)
457         {
458                 /* Item has already been seen */
459                 syslog(LOG_DEBUG, "%s has already been seen - %ld < %ld", ChrPtr(Ctx->ThisMsg->MsgGUID), seenstamp, antiExpire);
460
461                 SetRSSState(IO, eRSSParsing);
462
463                 if (GetNextHashPos(Ctx->Messages,
464                                    Ctx->Pos,
465                                    &len, &Key,
466                                    (void**) &Ctx->ThisMsg))
467                         return NextDBOperation(
468                                 IO,
469                                 RSS_FetchNetworkUsetableEntry);
470                 else
471                         return eAbort;
472         }
473         else
474 #endif
475         {
476                 /* Item has already been seen */
477                 syslog(LOG_DEBUG,
478                               "%s Parsing - %ld >= %ld",
479                               ChrPtr(Ctx->ThisMsg->MsgGUID),
480                               seenstamp, antiExpire);
481                 SetRSSState(IO, eRSSParsing);
482
483                 NextDBOperation(IO, RSSSaveMessage);
484                 return eSendMore;
485         }
486         return eSendMore;
487 }
488
489 void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now)
490 {
491         OneRoomNetCfg *pRNCfg;
492         begin_critical_section(S_NETCONFIGS);
493         pRNCfg = CtdlGetNetCfgForRoom(pCfg->QRnumber);
494         if (pRNCfg != NULL)
495         {
496                 RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient];
497
498                 while (RSSCfg != NULL)
499                 {
500                         if (RSSCfg == pCfg->pCfg)
501                                 break;
502
503                         RSSCfg = RSSCfg->next;
504                 }
505                 if (RSSCfg != NULL)
506                 {
507                         RSSCfg->last_known_good = now;
508                 }
509         }
510         SaveRoomNetConfigFile(pRNCfg, pCfg->QRnumber);
511         FreeRoomNetworkStruct(&pRNCfg);
512         end_critical_section(S_NETCONFIGS);
513 }
514
515 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
516 {
517         HashPos *it = NULL;
518         long len;
519         const char *Key;
520         pRSSConfig *pCfg;
521         u_char rawdigest[MD5_DIGEST_LEN];
522         struct MD5Context md5context;
523         StrBuf *guid;
524         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
525
526
527         if ((IO->HttpReq.httpcode >= 300) && (IO->HttpReq.httpcode < 400)  && (Ctx->RedirectUrl != NULL))
528         {
529                 StrBuf *ErrMsg;
530                 long lens[2];
531                 const char *strs[2];
532
533                 SetRSSState(IO, eRSSFailure);
534                 ErrMsg = NewStrBuf();
535                 if (IO) {
536                         syslog(LOG_INFO, "need a 200, got a %ld !", IO->HttpReq.httpcode);
537                 }
538                 strs[0] = ChrPtr(Ctx->Url);
539                 lens[0] = StrLength(Ctx->Url);
540
541                 strs[1] = ChrPtr(Ctx->rooms);
542                 lens[1] = StrLength(Ctx->rooms);
543
544                 if (IO->HttpReq.CurlError == NULL)
545                         IO->HttpReq.CurlError = "";
546
547                 StrBufPrintf(ErrMsg,
548                      "Error while RSS-Aggregation Run of %s\n"
549                      " need a 200, got a %ld !\n"
550                      " Curl Error message: \n%s / %s\n"
551                      " Redirect header points to: %s\n"
552                      " Response text was: \n"
553                      " \n %s\n",
554                      ChrPtr(Ctx->Url),
555                      IO->HttpReq.httpcode,
556                      IO->HttpReq.errdesc,
557                      IO->HttpReq.CurlError,
558                      ChrPtr(Ctx->RedirectUrl),
559                      ChrPtr(IO->HttpReq.ReplyData)
560                 );
561
562                 CtdlAideFPMessage(
563                         ChrPtr(ErrMsg),
564                         "RSS Aggregation run failure",
565                         2, strs, (long*) &lens,
566                         CCID,
567                         IO->ID,
568                         EvGetNow(IO));
569                 
570                 FreeStrBuf(&ErrMsg);
571                 syslog(LOG_DEBUG,
572                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>",
573                               ChrPtr(Ctx->Url),
574                               IO->HttpReq.httpcode
575                 );
576                 return eAbort;
577         }
578         else if (IO->HttpReq.httpcode != 200)
579         {
580                 StrBuf *ErrMsg;
581                 long lens[2];
582                 const char *strs[2];
583
584                 SetRSSState(IO, eRSSFailure);
585                 ErrMsg = NewStrBuf();
586                 if (IO) {
587                         syslog(LOG_INFO, "need a 200, got a %ld !", IO->HttpReq.httpcode);
588                 }
589                 strs[0] = ChrPtr(Ctx->Url);
590                 lens[0] = StrLength(Ctx->Url);
591
592                 strs[1] = ChrPtr(Ctx->rooms);
593                 lens[1] = StrLength(Ctx->rooms);
594
595                 if (IO->HttpReq.CurlError == NULL)
596                         IO->HttpReq.CurlError = "";
597
598                 StrBufPrintf(ErrMsg,
599                              "Error while RSS-Aggregation Run of %s\n"
600                              " need a 200, got a %ld !\n"
601                              " Curl Error message: \n%s / %s\n"
602                              " Response text was: \n"
603                              " \n %s\n",
604                              ChrPtr(Ctx->Url),
605                              IO->HttpReq.httpcode,
606                              IO->HttpReq.errdesc,
607                              IO->HttpReq.CurlError,
608                              ChrPtr(IO->HttpReq.ReplyData)
609                         );
610
611                 CtdlAideFPMessage(
612                         ChrPtr(ErrMsg),
613                         "RSS Aggregation run failure",
614                         2, strs, (long*) &lens,
615                         CCID,
616                         IO->ID,
617                         EvGetNow(IO));
618                 
619                 FreeStrBuf(&ErrMsg);
620                 syslog(LOG_DEBUG,
621                               "RSS feed returned an invalid http status code. <%s><HTTP %ld>",
622                               ChrPtr(Ctx->Url),
623                               IO->HttpReq.httpcode
624                 );
625                 return eAbort;
626         }
627
628         pCfg = &Ctx->Cfg;
629
630         while (pCfg != NULL)
631         {
632                 UpdateLastKnownGood (pCfg, EvGetNow(IO));
633                 if ((Ctx->roomlist_parts > 1) && 
634                     (it == NULL))
635                 {
636                         it = GetNewHashPos(RSSFetchUrls, 0);
637                 }
638                 if (it != NULL)
639                 {
640                         void *vptr;
641                         if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr))
642                                 pCfg = vptr;
643                         else
644                                 pCfg = NULL;
645                 }
646                 else 
647                         pCfg = NULL;
648         }
649         DeleteHashPos (&it);
650
651         SetRSSState(IO, eRSSUT);
652
653         MD5Init(&md5context);
654
655         MD5Update(&md5context,
656                   (const unsigned char*)SKEY(IO->HttpReq.ReplyData));
657
658         MD5Update(&md5context,
659                   (const unsigned char*)SKEY(Ctx->Url));
660
661         MD5Final(rawdigest, &md5context);
662         guid = NewStrBufPlain(NULL,
663                               MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
664         StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
665         StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0);
666         if (StrLength(guid) > 40)
667                 StrBufCutAt(guid, 40, NULL);
668         /* Find out if we've already seen this item */
669
670 #ifndef DEBUG_RSS
671
672         if (CheckIfAlreadySeen("RSS Whole",
673                                guid,
674                                EvGetNow(IO),
675                                EvGetNow(IO) - USETABLE_ANTIEXPIRE,
676                                eUpdate,
677                                CCID, IO->ID)
678             != 0)
679         {
680                 FreeStrBuf(&guid);
681
682                 syslog(LOG_DEBUG, "RSS feed already seen. <%s>", ChrPtr(Ctx->Url));
683                 return eAbort;
684         }
685         FreeStrBuf(&guid);
686 #endif
687         SetRSSState(IO, eRSSParsing);
688         return RSSAggregator_ParseReply(IO);
689 }
690
691 eNextState RSSAggregator_FinishHttp(AsyncIO *IO)
692 {
693         return CurlQueueDBOperation(IO, RSSAggregator_AnalyseReply);
694 }
695
696 /*
697  * Begin a feed parse
698  */
699 int rss_do_fetching(rss_aggregator *RSSAggr)
700 {
701         AsyncIO         *IO = &RSSAggr->IO;
702         rss_item *ri;
703         time_t now;
704         CURLcode sta;
705         CURL *chnd;
706
707
708         now = time(NULL);
709
710         if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
711                 return 0;
712
713         ri = (rss_item*) malloc(sizeof(rss_item));
714         memset(ri, 0, sizeof(rss_item));
715         RSSAggr->Item = ri;
716
717         if (! InitcURLIOStruct(&RSSAggr->IO,
718                                RSSAggr,
719                                "Citadel RSS Client",
720                                RSSAggregator_FinishHttp,
721                                RSSAggregator_Terminate,
722                                RSSAggregator_TerminateDB,
723                                RSSAggregator_ShutdownAbort))
724         {
725                 syslog(LOG_INFO, "Unable to initialize libcurl.");
726                 return 0;
727         }
728         chnd = IO->HttpReq.chnd;
729         OPT(HEADERDATA, IO);
730         OPT(HEADERFUNCTION, GetLocationString);
731         SetRSSState(IO, eRSSCreated);
732
733         safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
734                     ChrPtr(RSSAggr->Url),
735                     sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
736
737         syslog(LOG_DEBUG, "Fetching RSS feed <%s>", ChrPtr(RSSAggr->Url));
738         ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
739         CurlPrepareURL(RSSAggr->IO.ConnectMe);
740
741         SetRSSState(IO, eRSSFetching);
742         QueueCurlContext(&RSSAggr->IO);
743         return 1;
744 }
745
746 /*
747  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
748  */
749 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
750 {
751         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
752         rss_aggregator *RSSAggr = NULL;
753         rss_aggregator *use_this_RSSAggr = NULL;
754         void *vptr;
755
756         syslog(LOG_DEBUG, "rssclient_scan_room(%s)", qrbuf->QRname);
757         pthread_mutex_lock(&RSSQueueMutex);
758         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
759         {
760                 syslog(LOG_DEBUG,
761                               "rssclient: [%ld] %s already in progress.",
762                               qrbuf->QRnumber,
763                               qrbuf->QRname);
764                 pthread_mutex_unlock(&RSSQueueMutex);
765                 return;
766         }
767         pthread_mutex_unlock(&RSSQueueMutex);
768
769         if (server_shutting_down) return;
770
771         while (RSSCfg != NULL)
772         {
773                 pthread_mutex_lock(&RSSQueueMutex);
774                 GetHash(RSSFetchUrls,
775                         SKEY(RSSCfg->Url),
776                         &vptr);
777
778                 use_this_RSSAggr = (rss_aggregator *)vptr;
779                 if (use_this_RSSAggr != NULL)
780                 {
781                         pRSSConfig *pRSSCfg;
782
783                         StrBufAppendBufPlain(
784                                 use_this_RSSAggr->rooms,
785                                 qrbuf->QRname,
786                                 -1, 0);
787                         if (use_this_RSSAggr->roomlist_parts==1)
788                         {
789                                 use_this_RSSAggr->OtherQRnumbers
790                                         = NewHash(1, lFlathash);
791                         }
792
793                         pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig));
794
795                         pRSSCfg->QRnumber = qrbuf->QRnumber;
796                         pRSSCfg->pCfg = RSSCfg;
797
798                         Put(use_this_RSSAggr->OtherQRnumbers,
799                             LKEY(qrbuf->QRnumber),
800                             pRSSCfg,
801                             NULL);
802                         use_this_RSSAggr->roomlist_parts++;
803
804                         pthread_mutex_unlock(&RSSQueueMutex);
805
806                         RSSCfg = RSSCfg->next;
807                         continue;
808                 }
809                 pthread_mutex_unlock(&RSSQueueMutex);
810
811                 RSSAggr = (rss_aggregator *) malloc(
812                         sizeof(rss_aggregator));
813
814                 memset (RSSAggr, 0, sizeof(rss_aggregator));
815                 RSSAggr->Cfg.QRnumber = qrbuf->QRnumber;
816                 RSSAggr->Cfg.pCfg = RSSCfg;
817                 RSSAggr->roomlist_parts = 1;
818                 RSSAggr->Url = NewStrBufDup(RSSCfg->Url);
819
820                 RSSAggr->ItemType = RSS_UNSET;
821
822                 RSSAggr->rooms = NewStrBufPlain(
823                         qrbuf->QRname, -1);
824
825                 pthread_mutex_lock(&RSSQueueMutex);
826
827                 Put(RSSFetchUrls,
828                     SKEY(RSSAggr->Url),
829                     RSSAggr,
830                     DeleteRssCfg);
831
832                 pthread_mutex_unlock(&RSSQueueMutex);
833                 RSSCfg = RSSCfg->next;
834         }
835 }
836
837 /*
838  * Scan for rooms that have RSS client requests configured
839  */
840 void rssclient_scan(void) {
841         int RSSRoomCount, RSSCount;
842         rss_aggregator *rptr = NULL;
843         void *vrptr = NULL;
844         HashPos *it;
845         long len;
846         const char *Key;
847         time_t now = time(NULL);
848
849         /* Run no more than once every 15 minutes. */
850         if ((now - last_run) < 900) {
851                 syslog(LOG_DEBUG,
852                               "Client: polling interval not yet reached; last run was %ldm%lds ago",
853                               ((now - last_run) / 60),
854                               ((now - last_run) % 60)
855                 );
856                 return;
857         }
858
859         /*
860          * This is a simple concurrency check to make sure only one rssclient
861          * run is done at a time.
862          */
863         pthread_mutex_lock(&RSSQueueMutex);
864         RSSCount = GetCount(RSSFetchUrls);
865         RSSRoomCount = GetCount(RSSQueueRooms);
866         pthread_mutex_unlock(&RSSQueueMutex);
867
868         if ((RSSRoomCount > 0) || (RSSCount > 0)) {
869                 syslog(LOG_DEBUG,
870                               "rssclient: concurrency check failed; %d rooms and %d url's are queued",
871                               RSSRoomCount, RSSCount
872                         );
873                 return;
874         }
875
876         become_session(&rss_CC);
877         syslog(LOG_DEBUG, "rssclient started");
878         CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
879
880         if (GetCount(RSSFetchUrls) > 0)
881         {
882                 pthread_mutex_lock(&RSSQueueMutex);
883                 syslog(LOG_DEBUG,
884                                "rssclient starting %d Clients",
885                                GetCount(RSSFetchUrls));
886                 
887                 it = GetNewHashPos(RSSFetchUrls, 0);
888                 while (!server_shutting_down &&
889                        GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
890                        (vrptr != NULL)) {
891                         rptr = (rss_aggregator *)vrptr;
892                         if (!rss_do_fetching(rptr))
893                                 UnlinkRSSAggregator(rptr);
894                 }
895                 DeleteHashPos(&it);
896                 pthread_mutex_unlock(&RSSQueueMutex);
897         }
898         else {
899                 syslog(LOG_DEBUG, "Nothing to do.");
900         }
901
902         syslog(LOG_DEBUG, "rssclient ended");
903         return;
904 }
905
906 void rss_cleanup(void)
907 {
908         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
909         DeleteHash(&RSSFetchUrls);
910         DeleteHash(&RSSQueueRooms);
911 }
912
913 void LogDebugEnableRSSClient(const int n)
914 {
915         RSSClientDebugEnabled = n;
916 }
917
918
919 typedef struct __RSSVetoInfo {
920         StrBuf *ErrMsg;
921         time_t Now;
922         int Veto;
923 }RSSVetoInfo;
924
925 void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
926 {
927         RSSVetoInfo *Info = (RSSVetoInfo *) data;
928         const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient];
929
930         while (RSSCfg != NULL)
931         {
932                 if ((RSSCfg->last_known_good != 0) &&
933                     (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now))
934                 {
935                         StrBufAppendPrintf(Info->ErrMsg,
936                                            "RSS feed not seen for a %d days:: <",
937                                            (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60));
938
939                         StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0);
940                         StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0);
941                 }
942                 RSSCfg = RSSCfg->next;
943         }
944 }
945
946 int RSSCheckUsetableVeto(StrBuf *ErrMsg)
947 {
948         RSSVetoInfo Info;
949
950         Info.ErrMsg = ErrMsg;
951         Info.Now = time (NULL);
952         Info.Veto = 0;
953
954         CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient);
955
956         return Info.Veto;;
957 }
958
959
960
961
962 void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG)
963 {
964         RSSCfgLine *RSSCfg;
965
966         RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine));
967         RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line));
968         
969
970         StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|');
971         RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|');
972
973
974         RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C];
975         OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg;
976 }
977
978 void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data)
979 {
980         RSSCfgLine *RSSCfg = (RSSCfgLine*) data;
981
982         StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0);
983         StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0);
984         StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0);
985         StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good);
986 }
987
988 void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data)
989 {
990         RSSCfgLine *RSSCfg = (RSSCfgLine*) *data;
991
992         FreeStrBuf(&RSSCfg->Url);
993         free(*data);
994         *data = NULL;
995 }
996
997
998 CTDL_MODULE_INIT(rssclient)
999 {
1000         if (!threading)
1001         {
1002                 CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0);
1003
1004                 CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine);
1005                 pthread_mutex_init(&RSSQueueMutex, NULL);
1006                 RSSQueueRooms = NewHash(1, lFlathash);
1007                 RSSFetchUrls = NewHash(1, NULL);
1008                 syslog(LOG_INFO, "%s\n", curl_version());
1009                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
1010                 CtdlRegisterEVCleanupHook(rss_cleanup);
1011                 CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
1012         }
1013         else
1014         {
1015                 CtdlFillSystemContext(&rss_CC, "rssclient");
1016         }
1017         return "rssclient";
1018 }