db6d37fb0f3bf31c8eb555a0ae6b888d560b2ddd
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
70
71 eNextState RSSAggregatorTerminate(AsyncIO *IO);
72 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
74
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
77 {
78         if (StrLength(link) > 0)
79         {
80                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
81                 StrBufAppendBuf(Message, link, 0);
82                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
83                 if (StrLength(LinkTitle) > 0)
84                         StrBufAppendBuf(Message, LinkTitle, 0);
85                 else if ((Title != NULL) && !IsEmptyStr(Title))
86                         StrBufAppendBufPlain(Message, Title, -1, 0);
87                 else
88                         StrBufAppendBuf(Message, link, 0);
89                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
90         }
91 }
92
93
94 void DeleteRoomReference(long QRnumber)
95 {
96         HashPos *At;
97         long HKLen;
98         const char *HK;
99         void *vData = NULL;
100         rss_room_counter *pRoomC;
101
102         At = GetNewHashPos(RSSQueueRooms, 0);
103
104         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
105         {
106                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
107                 if (vData != NULL)
108                 {
109                         pRoomC = (rss_room_counter *) vData;
110                         pRoomC->count --;
111                         if (pRoomC->count == 0)
112                                 DeleteEntryFromHash(RSSQueueRooms, At);
113                 }
114         }
115         DeleteHashPos(&At);
116 }
117
118 void UnlinkRooms(rss_aggregator *Cfg)
119 {
120         
121         DeleteRoomReference(Cfg->QRnumber);
122         if (Cfg->OtherQRnumbers != NULL)
123         {
124                 long HKLen;
125                 const char *HK;
126                 HashPos *At;
127                 void *vData;
128
129                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
130                 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && 
131                        (vData != NULL))
132                 {
133                         long *lData = (long*) vData;
134                         DeleteRoomReference(*lData);
135                 }
136 /*
137                 if (server_shutting_down)
138                         break; / * TODO */
139
140                 DeleteHashPos(&At);
141         }
142 }
143
144 void UnlinkRSSAggregator(rss_aggregator *Cfg)
145 {
146         HashPos *At;
147
148         UnlinkRooms(Cfg);
149
150         At = GetNewHashPos(RSSFetchUrls, 0);
151         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
152         {
153                 DeleteEntryFromHash(RSSFetchUrls, At);
154         }
155         DeleteHashPos(&At);
156         last_run = time(NULL);
157 }
158 /*
159 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
160 {
161         networker_save_message *Ctx = (networker_save_message *) IO->Data;
162
163         pthread_mutex_lock(&RSSQueueMutex);
164         Ctx->Cfg->RefCount --;
165
166         if (Ctx->Cfg->RefCount == 0)
167         {
168                 UnlinkRSSAggregator(Ctx->Cfg);
169
170         }
171         pthread_mutex_unlock(&RSSQueueMutex);
172
173         CtdlFreeMessage(Ctx->Msg);
174         free_recipients(Ctx->recp);
175         FreeStrBuf(&Ctx->Message);
176         FreeStrBuf(&Ctx->MsgGUID);
177         ((struct CitContext*)IO->CitContext)->state = CON_IDLE;
178         ((struct CitContext*)IO->CitContext)->kill_me = 1;
179         free(Ctx);
180         last_run = time(NULL);
181         return eAbort;
182 }
183 */
184 void FreeNetworkSaveMessage (void *vMsg)
185 {
186         networker_save_message *Msg = (networker_save_message *) vMsg;
187
188         CtdlFreeMessage(Msg->Msg);
189         FreeStrBuf(&Msg->Message);
190         FreeStrBuf(&Msg->MsgGUID);
191         free(Msg);
192 }
193
194 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
195 {
196         return eAbort; ///TODO
197 }
198
199 eNextState RSSSaveMessage(AsyncIO *IO)
200 {
201         long len;
202         const char *Key;
203         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
204
205         Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
206
207         CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
208
209         /* write the uidl to the use table so we don't store this item again */
210         cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
211         
212         if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
213                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
214         else
215                 return eAbort;
216 }
217
218 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
219 {
220         const char *Key;
221         long len;
222         struct cdbdata *cdbut;
223         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
224
225
226         /* Find out if we've already seen this item */
227         strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
228         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
229
230         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
231 #ifndef DEBUG_RSS
232         if (cdbut != NULL) {
233                 /* Item has already been seen */
234                 EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
235                 cdb_free(cdbut);
236
237                 /* rewrite the record anyway, to update the timestamp */
238                 cdb_store(CDB_USETABLE, 
239                           SKEY(Ctx->ThisMsg->MsgGUID), 
240                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
241
242                 if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
243                         return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
244                 else
245                         return eAbort;
246         }
247         else
248 #endif
249         {
250                 NextDBOperation(IO, RSSSaveMessage);
251                 return eSendMore;
252         }
253 }
254 /*
255 void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg)
256 {
257         networker_save_message *Ctx;
258
259         pthread_mutex_lock(&RSSQueueMutex);
260         Cfg->RefCount ++;
261         pthread_mutex_unlock(&RSSQueueMutex);
262
263
264         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
265         memset(Ctx, 0, sizeof(networker_save_message));
266         
267         Ctx->MsgGUID = MsgGUID;
268         Ctx->Message = MessageBody;
269         Ctx->Msg = Msg;
270         Ctx->Cfg = Cfg;
271         Ctx->recp = recp;
272         Ctx->IO.Data = Ctx;
273         Ctx->IO.CitContext = CloneContext(&rss_CC);
274         Ctx->IO.Terminate = FreeNetworkSaveMessage;
275         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
276         QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
277 }
278 */
279
280 /*
281  * Commit a fetched and parsed RSS item to disk
282  */
283 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
284 {
285
286         struct MD5Context md5context;
287         u_char rawdigest[MD5_DIGEST_LEN];
288         struct CtdlMessage *msg;
289         int msglen = 0;
290         StrBuf *Message;
291         StrBuf *guid;
292         AsyncIO *IO = &Cfg->IO;
293
294         int n;
295    
296         /* Construct a GUID to use in the S_USETABLE table.
297          * If one is not present in the item itself, make one up.
298          */
299         if (ri->guid != NULL) {
300                 StrBufSpaceToBlank(ri->guid);
301                 StrBufTrim(ri->guid);
302                 guid = NewStrBufPlain(HKEY("rss/"));
303                 StrBufAppendBuf(guid, ri->guid, 0);
304         }
305         else {
306                 MD5Init(&md5context);
307                 if (ri->title != NULL) {
308                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
309                 }
310                 if (ri->link != NULL) {
311                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
312                 }
313                 MD5Final(rawdigest, &md5context);
314                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
315                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
316                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
317         }
318
319         /* translate Item into message. */
320         EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
321         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
322         StrBufSpaceToBlank(ri->description);
323         msg = malloc(sizeof(struct CtdlMessage));
324         memset(msg, 0, sizeof(struct CtdlMessage));
325         msg->cm_magic = CTDLMESSAGE_MAGIC;
326         msg->cm_anon_type = MES_NORMAL;
327         msg->cm_format_type = FMT_RFC822;
328
329         if (ri->guid != NULL) {
330                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
331         }
332
333         if (ri->author_or_creator != NULL) {
334                 char *From;
335                 StrBuf *Encoded = NULL;
336                 int FromAt;
337                         
338                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
339                                      StrLength(ri->author_or_creator), 
340                                      512, 0);
341                 StrBufPlain(ri->author_or_creator, From, -1);
342                 StrBufTrim(ri->author_or_creator);
343                 free(From);
344
345                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
346                 if (!FromAt && StrLength (ri->author_email) > 0)
347                 {
348                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
349                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
350                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
351                 }
352                 else
353                 {
354                         if (FromAt)
355                         {
356                                 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
357                                 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
358                         }
359                         else 
360                         {
361                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
362                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
363                                 msg->cm_fields['P'] = strdup("rss@localhost");
364
365                         }
366                         if (ri->pubdate <= 0) {
367                                 ri->pubdate = time(NULL);
368                         }
369                 }
370         }
371         else {
372                 msg->cm_fields['A'] = strdup("rss");
373         }
374
375         msg->cm_fields['N'] = strdup(NODENAME);
376         if (ri->title != NULL) {
377                 long len;
378                 char *Sbj;
379                 StrBuf *Encoded, *QPEncoded;
380
381                 QPEncoded = NULL;
382                 StrBufSpaceToBlank(ri->title);
383                 len = StrLength(ri->title);
384                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
385                 len = strlen(Sbj);
386                 if (Sbj[len - 1] == '\n')
387                 {
388                         len --;
389                         Sbj[len] = '\0';
390                 }
391                 Encoded = NewStrBufPlain(Sbj, len);
392                 free(Sbj);
393
394                 StrBufTrim(Encoded);
395                 StrBufRFC2047encode(&QPEncoded, Encoded);
396
397                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
398                 FreeStrBuf(&Encoded);
399         }
400         msg->cm_fields['T'] = malloc(64);
401         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
402         if (ri->channel_title != NULL) {
403                 if (StrLength(ri->channel_title) > 0) {
404                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
405                 }
406         }
407         if (ri->link == NULL) 
408                 ri->link = NewStrBufPlain(HKEY(""));
409
410 #if 0 /* temporarily disable shorter urls. */
411         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
412 #endif
413
414         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
415
416         Message = NewStrBufPlain(NULL, StrLength(ri->description));
417
418         StrBufPlain(Message, HKEY(
419                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
420                             "<html><body>\n"));
421 #if 0 /* disable shorter url for now. */
422         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
423 #endif
424         StrBufAppendBuf(Message, ri->description, 0);
425         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
426
427         AppendLink(Message, ri->link, ri->linkTitle, NULL);
428         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
429         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
430
431
432
433         networker_save_message *SaveMsg;
434
435         SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message));
436         memset(SaveMsg, 0, sizeof(networker_save_message));
437         
438         SaveMsg->MsgGUID = guid;
439         SaveMsg->Message = Message;
440         SaveMsg->Msg = msg;
441
442         n = GetCount(Cfg->Messages) + 1;
443         Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
444 }
445
446
447
448 /*
449  * Begin a feed parse
450  */
451 int rss_do_fetching(rss_aggregator *Cfg)
452 {
453         rss_item *ri;
454                 
455         time_t now;
456         AsyncIO *IO;
457
458         now = time(NULL);
459
460         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
461                 return 0;
462
463         ri = (rss_item*) malloc(sizeof(rss_item));
464         memset(ri, 0, sizeof(rss_item));
465         Cfg->Item = ri;
466         IO = &Cfg->IO;
467         IO->CitContext = CloneContext(&rss_CC);
468         IO->Data = Cfg;
469
470         safestrncpy(((CitContext*)IO->CitContext)->cs_host, 
471                     ChrPtr(Cfg->Url),
472                     sizeof(((CitContext*)IO->CitContext)->cs_host)); 
473
474         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
475         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
476         CurlPrepareURL(IO->ConnectMe);
477
478         if (! evcurl_init(IO, 
479 //                        Ctx, 
480                           NULL,
481                           "Citadel RSS Client",
482                           ParseRSSReply, 
483                           RSSAggregatorTerminate,
484                           RSSAggregatorShutdownAbort))
485         {
486                 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
487                 return 0;
488         }
489
490         QueueCurlContext(IO);
491         return 1;
492 }
493
494
495 void DeleteRssCfg(void *vptr)
496 {
497         rss_aggregator *rncptr = (rss_aggregator *)vptr;
498         AsyncIO *IO = &rncptr->IO;
499         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
500
501         FreeStrBuf(&rncptr->Url);
502         FreeStrBuf(&rncptr->rooms);
503         FreeStrBuf(&rncptr->CData);
504         FreeStrBuf(&rncptr->Key);
505         FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
506         DeleteHash(&rncptr->OtherQRnumbers);
507         FreeURL(&rncptr->IO.ConnectMe);
508
509         DeleteHashPos (&rncptr->Pos);
510         DeleteHash (&rncptr->Messages);
511         if (rncptr->recp.recp_room != NULL)
512                 free(rncptr->recp.recp_room);
513
514
515         if (rncptr->Item != NULL)
516         {
517                 FreeStrBuf(&rncptr->Item->guid);
518                 FreeStrBuf(&rncptr->Item->title);
519                 FreeStrBuf(&rncptr->Item->link);
520                 FreeStrBuf(&rncptr->Item->linkTitle);
521                 FreeStrBuf(&rncptr->Item->reLink);
522                 FreeStrBuf(&rncptr->Item->reLinkTitle);
523                 FreeStrBuf(&rncptr->Item->description);
524                 FreeStrBuf(&rncptr->Item->channel_title);
525                 FreeStrBuf(&rncptr->Item->author_or_creator);
526                 FreeStrBuf(&rncptr->Item->author_url);
527                 FreeStrBuf(&rncptr->Item->author_email);
528
529                 free(rncptr->Item);
530         }
531         free(rncptr);
532 }
533
534 eNextState RSSAggregatorTerminate(AsyncIO *IO)
535 {
536         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
537
538         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
539
540
541         UnlinkRSSAggregator(rncptr);
542         return eAbort;
543 }
544 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO)
545 {
546         const char *pUrl;
547         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
548
549         pUrl = IO->ConnectMe->PlainUrl;
550         if (pUrl == NULL)
551                 pUrl = "";
552
553         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
554
555
556         UnlinkRSSAggregator(rncptr);
557         return eAbort;
558 }
559
560 /*
561  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
562  */
563 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
564 {
565         StrBuf *CfgData=NULL;
566         StrBuf *CfgType;
567         StrBuf *Line;
568         rss_room_counter *Count = NULL;
569         struct stat statbuf;
570         char filename[PATH_MAX];
571         int  fd;
572         int Done;
573         rss_aggregator *rncptr = NULL;
574         rss_aggregator *use_this_rncptr = NULL;
575         void *vptr;
576         const char *CfgPtr, *lPtr;
577         const char *Err;
578
579         pthread_mutex_lock(&RSSQueueMutex);
580         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
581         {
582                 syslog(LOG_DEBUG, 
583                               "rssclient: [%ld] %s already in progress.\n", 
584                               qrbuf->QRnumber, 
585                               qrbuf->QRname);
586                 pthread_mutex_unlock(&RSSQueueMutex);
587                 return;
588         }
589         pthread_mutex_unlock(&RSSQueueMutex);
590
591         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
592
593         if (server_shutting_down)
594                 return;
595                 
596         /* Only do net processing for rooms that have netconfigs */
597         fd = open(filename, 0);
598         if (fd <= 0) {
599                 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
600                 return;
601         }
602
603         if (server_shutting_down)
604                 return;
605
606         if (fstat(fd, &statbuf) == -1) {
607                 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
608                        filename, strerror(errno));
609                 return;
610         }
611
612         if (server_shutting_down)
613                 return;
614
615         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
616
617         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
618                 close(fd);
619                 FreeStrBuf(&CfgData);
620                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
621                         filename, strerror(errno));
622                 return;
623         }
624         close(fd);
625         if (server_shutting_down)
626                 return;
627         
628         CfgPtr = NULL;
629         CfgType = NewStrBuf();
630         Line = NewStrBufPlain(NULL, StrLength(CfgData));
631         Done = 0;
632         while (!Done)
633         {
634             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
635             if (StrLength(Line) > 0)
636             {
637                 lPtr = NULL;
638                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
639                 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
640                 {
641                     if (Count == NULL)
642                     {
643                         Count = malloc(sizeof(rss_room_counter));
644                         Count->count = 0;
645                     }
646                     Count->count ++;
647                     rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
648                     memset (rncptr, 0, sizeof(rss_aggregator));
649                     rncptr->roomlist_parts = 1;
650                     rncptr->Url = NewStrBuf();
651                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
652
653                     pthread_mutex_lock(&RSSQueueMutex);
654                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
655                     use_this_rncptr = (rss_aggregator *)vptr;
656                     if (use_this_rncptr != NULL)
657                     {
658                             long *QRnumber;
659                             StrBufAppendBufPlain(use_this_rncptr->rooms, 
660                                                  qrbuf->QRname, 
661                                                  -1, 0);
662                             if (use_this_rncptr->roomlist_parts == 1)
663                             {
664                                     use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
665                             }
666                             QRnumber = (long*)malloc(sizeof(long));
667                             *QRnumber = qrbuf->QRnumber;
668                             Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
669                             use_this_rncptr->roomlist_parts++;
670
671                             pthread_mutex_unlock(&RSSQueueMutex);
672
673                             FreeStrBuf(&rncptr->Url);
674                             free(rncptr);
675                             rncptr = NULL;
676                             continue;
677                     }
678                     pthread_mutex_unlock(&RSSQueueMutex);
679
680                     rncptr->ItemType = RSS_UNSET;
681                                 
682                     rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
683
684                     pthread_mutex_lock(&RSSQueueMutex);
685                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
686                     pthread_mutex_unlock(&RSSQueueMutex);
687                 }
688             }
689         }
690         if (Count != NULL)
691         {
692                 Count->QRnumber = qrbuf->QRnumber;
693                 pthread_mutex_lock(&RSSQueueMutex);
694                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", 
695                               qrbuf->QRnumber, qrbuf->QRname);
696                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
697                 pthread_mutex_unlock(&RSSQueueMutex);
698         }
699         FreeStrBuf(&CfgData);
700         FreeStrBuf(&CfgType);
701         FreeStrBuf(&Line);
702 }
703
704 /*
705  * Scan for rooms that have RSS client requests configured
706  */
707 void rssclient_scan(void) {
708         static int doing_rssclient = 0;
709         rss_aggregator *rptr = NULL;
710         void *vrptr = NULL;
711         HashPos  *it;
712         long len;
713         const char *Key;
714
715         /* Run no more than once every 15 minutes. */
716         if ((time(NULL) - last_run) < 900) {
717                 return;
718         }
719
720         /*
721          * This is a simple concurrency check to make sure only one rssclient run
722          * is done at a time.  We could do this with a mutex, but since we
723          * don't really require extremely fine granularity here, we'll do it
724          * with a static variable instead.
725          */
726         if (doing_rssclient) return;
727         doing_rssclient = 1;
728         if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
729                 return;
730
731         syslog(LOG_DEBUG, "rssclient started\n");
732         CtdlForEachRoom(rssclient_scan_room, NULL);
733
734         pthread_mutex_lock(&RSSQueueMutex);
735
736         it = GetNewHashPos(RSSFetchUrls, 0);
737         while (!server_shutting_down &&
738                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
739                (vrptr != NULL)) {
740                 rptr = (rss_aggregator *)vrptr;
741                 if (!rss_do_fetching(rptr))
742                         UnlinkRSSAggregator(rptr);
743         }
744         DeleteHashPos(&it);
745         pthread_mutex_unlock(&RSSQueueMutex);
746
747         syslog(LOG_DEBUG, "rssclient ended\n");
748         doing_rssclient = 0;
749         return;
750 }
751
752 void rss_cleanup(void)
753 {
754         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
755         DeleteHash(&RSSFetchUrls);
756         DeleteHash(&RSSQueueRooms);
757 }
758
759
760 CTDL_MODULE_INIT(rssclient)
761 {
762         if (threading)
763         {
764                 CtdlFillSystemContext(&rss_CC, "rssclient");
765                 pthread_mutex_init(&RSSQueueMutex, NULL);
766                 RSSQueueRooms = NewHash(1, lFlathash);
767                 RSSFetchUrls = NewHash(1, NULL);
768                 syslog(LOG_INFO, "%s\n", curl_version());
769                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
770                 CtdlRegisterCleanupHook(rss_cleanup);
771         }
772         return "rssclient";
773 }