before removing an entry from an hash, revalidate whether we actualy found it
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
70
71 eNextState RSSAggregatorTerminate(AsyncIO *IO);
72 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
74
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
77 {
78         if (StrLength(link) > 0)
79         {
80                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
81                 StrBufAppendBuf(Message, link, 0);
82                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
83                 if (StrLength(LinkTitle) > 0)
84                         StrBufAppendBuf(Message, LinkTitle, 0);
85                 else if ((Title != NULL) && !IsEmptyStr(Title))
86                         StrBufAppendBufPlain(Message, Title, -1, 0);
87                 else
88                         StrBufAppendBuf(Message, link, 0);
89                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
90         }
91 }
92
93
94 void DeleteRoomReference(long QRnumber)
95 {
96         HashPos *At;
97         long HKLen;
98         const char *HK;
99         void *vData = NULL;
100         rss_room_counter *pRoomC;
101
102         At = GetNewHashPos(RSSQueueRooms, 0);
103
104         if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
105         {
106                 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
107                 if (vData != NULL)
108                 {
109                         pRoomC = (rss_room_counter *) vData;
110                         pRoomC->count --;
111                         if (pRoomC->count == 0)
112                                 DeleteEntryFromHash(RSSQueueRooms, At);
113                 }
114         }
115         DeleteHashPos(&At);
116 }
117
118 void UnlinkRooms(rss_aggregator *Cfg)
119 {
120         
121         DeleteRoomReference(Cfg->QRnumber);
122         if (Cfg->OtherQRnumbers != NULL)
123         {
124                 long HKLen;
125                 const char *HK;
126                 HashPos *At;
127                 void *vData;
128
129                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
130                 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && 
131                        (vData != NULL))
132                 {
133                         long *lData = (long*) vData;
134                         DeleteRoomReference(*lData);
135                 }
136 /*
137                 if (server_shutting_down)
138                         break; / * TODO */
139
140                 DeleteHashPos(&At);
141         }
142 }
143
144 void UnlinkRSSAggregator(rss_aggregator *Cfg)
145 {
146         HashPos *At;
147
148         UnlinkRooms(Cfg);
149
150         At = GetNewHashPos(RSSFetchUrls, 0);
151         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
152         {
153                 DeleteEntryFromHash(RSSFetchUrls, At);
154         }
155         DeleteHashPos(&At);
156         last_run = time(NULL);
157 }
158 /*
159 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
160 {
161         networker_save_message *Ctx = (networker_save_message *) IO->Data;
162
163         pthread_mutex_lock(&RSSQueueMutex);
164         Ctx->Cfg->RefCount --;
165
166         if (Ctx->Cfg->RefCount == 0)
167         {
168                 UnlinkRSSAggregator(Ctx->Cfg);
169
170         }
171         pthread_mutex_unlock(&RSSQueueMutex);
172
173         CtdlFreeMessage(Ctx->Msg);
174         free_recipients(Ctx->recp);
175         FreeStrBuf(&Ctx->Message);
176         FreeStrBuf(&Ctx->MsgGUID);
177         ((struct CitContext*)IO->CitContext)->state = CON_IDLE;
178         ((struct CitContext*)IO->CitContext)->kill_me = 1;
179         free(Ctx);
180         last_run = time(NULL);
181         return eAbort;
182 }
183 */
184 void FreeNetworkSaveMessage (void *vMsg)
185 {
186         networker_save_message *Msg = (networker_save_message *) vMsg;
187
188         CtdlFreeMessage(Msg->Msg);
189         FreeStrBuf(&Msg->Message);
190         FreeStrBuf(&Msg->MsgGUID);
191         free(Msg);
192 }
193
194 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
195 {
196         return eAbort; ///TODO
197 }
198
199 eNextState RSSSaveMessage(AsyncIO *IO)
200 {
201         long len;
202         const char *Key;
203         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
204
205         Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
206
207         CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
208
209         /* write the uidl to the use table so we don't store this item again */
210         cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
211         
212         if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
213                 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
214         else
215                 return eAbort;
216 }
217
218 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
219 {
220         const char *Key;
221         long len;
222         struct cdbdata *cdbut;
223         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
224
225
226         /* Find out if we've already seen this item */
227         strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
228         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
229
230         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
231 #ifndef DEBUG_RSS
232         if (cdbut != NULL) {
233                 /* Item has already been seen */
234                 EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
235                 cdb_free(cdbut);
236
237                 /* rewrite the record anyway, to update the timestamp */
238                 cdb_store(CDB_USETABLE, 
239                           SKEY(Ctx->ThisMsg->MsgGUID), 
240                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
241
242                 if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
243                         return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
244                 else
245                         return eAbort;
246         }
247         else
248 #endif
249         {
250                 NextDBOperation(IO, RSSSaveMessage);
251                 return eSendMore;
252         }
253 }
254 /*
255 void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg)
256 {
257         networker_save_message *Ctx;
258
259         pthread_mutex_lock(&RSSQueueMutex);
260         Cfg->RefCount ++;
261         pthread_mutex_unlock(&RSSQueueMutex);
262
263
264         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
265         memset(Ctx, 0, sizeof(networker_save_message));
266         
267         Ctx->MsgGUID = MsgGUID;
268         Ctx->Message = MessageBody;
269         Ctx->Msg = Msg;
270         Ctx->Cfg = Cfg;
271         Ctx->recp = recp;
272         Ctx->IO.Data = Ctx;
273         Ctx->IO.CitContext = CloneContext(&rss_CC);
274         Ctx->IO.Terminate = FreeNetworkSaveMessage;
275         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
276         QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
277 }
278 */
279
280 /*
281  * Commit a fetched and parsed RSS item to disk
282  */
283 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
284 {
285
286         struct MD5Context md5context;
287         u_char rawdigest[MD5_DIGEST_LEN];
288         struct CtdlMessage *msg;
289         int msglen = 0;
290         StrBuf *Message;
291         StrBuf *guid;
292         AsyncIO *IO = &Cfg->IO;
293
294         int n;
295    
296         /* Construct a GUID to use in the S_USETABLE table.
297          * If one is not present in the item itself, make one up.
298          */
299         if (ri->guid != NULL) {
300                 StrBufSpaceToBlank(ri->guid);
301                 StrBufTrim(ri->guid);
302                 guid = NewStrBufPlain(HKEY("rss/"));
303                 StrBufAppendBuf(guid, ri->guid, 0);
304         }
305         else {
306                 MD5Init(&md5context);
307                 if (ri->title != NULL) {
308                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
309                 }
310                 if (ri->link != NULL) {
311                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
312                 }
313                 MD5Final(rawdigest, &md5context);
314                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
315                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
316                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
317         }
318
319         /* translate Item into message. */
320         EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
321         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
322         StrBufSpaceToBlank(ri->description);
323         msg = malloc(sizeof(struct CtdlMessage));
324         memset(msg, 0, sizeof(struct CtdlMessage));
325         msg->cm_magic = CTDLMESSAGE_MAGIC;
326         msg->cm_anon_type = MES_NORMAL;
327         msg->cm_format_type = FMT_RFC822;
328
329         if (ri->guid != NULL) {
330                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
331         }
332
333         if (ri->author_or_creator != NULL) {
334                 char *From;
335                 StrBuf *Encoded = NULL;
336                 int FromAt;
337                         
338                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
339                                      StrLength(ri->author_or_creator), 
340                                      512, 0);
341                 StrBufPlain(ri->author_or_creator, From, -1);
342                 StrBufTrim(ri->author_or_creator);
343                 free(From);
344
345                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
346                 if (!FromAt && StrLength (ri->author_email) > 0)
347                 {
348                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
349                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
350                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
351                 }
352                 else
353                 {
354                         if (FromAt)
355                         {
356                                 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
357                                 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
358                         }
359                         else 
360                         {
361                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
362                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
363                                 msg->cm_fields['P'] = strdup("rss@localhost");
364
365                         }
366                         if (ri->pubdate <= 0) {
367                                 ri->pubdate = time(NULL);
368                         }
369                 }
370         }
371         else {
372                 msg->cm_fields['A'] = strdup("rss");
373         }
374
375         msg->cm_fields['N'] = strdup(NODENAME);
376         if (ri->title != NULL) {
377                 long len;
378                 char *Sbj;
379                 StrBuf *Encoded, *QPEncoded;
380
381                 QPEncoded = NULL;
382                 StrBufSpaceToBlank(ri->title);
383                 len = StrLength(ri->title);
384                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
385                 len = strlen(Sbj);
386                 if (Sbj[len - 1] == '\n')
387                 {
388                         len --;
389                         Sbj[len] = '\0';
390                 }
391                 Encoded = NewStrBufPlain(Sbj, len);
392                 free(Sbj);
393
394                 StrBufTrim(Encoded);
395                 StrBufRFC2047encode(&QPEncoded, Encoded);
396
397                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
398                 FreeStrBuf(&Encoded);
399         }
400         msg->cm_fields['T'] = malloc(64);
401         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
402         if (ri->channel_title != NULL) {
403                 if (StrLength(ri->channel_title) > 0) {
404                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
405                 }
406         }
407         if (ri->link == NULL) 
408                 ri->link = NewStrBufPlain(HKEY(""));
409
410 #if 0 /* temporarily disable shorter urls. */
411         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
412 #endif
413
414         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
415
416         Message = NewStrBufPlain(NULL, StrLength(ri->description));
417
418         StrBufPlain(Message, HKEY(
419                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
420                             "<html><body>\n"));
421 #if 0 /* disable shorter url for now. */
422         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
423 #endif
424         StrBufAppendBuf(Message, ri->description, 0);
425         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
426
427         AppendLink(Message, ri->link, ri->linkTitle, NULL);
428         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
429         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
430
431
432
433         networker_save_message *SaveMsg;
434
435         SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message));
436         memset(SaveMsg, 0, sizeof(networker_save_message));
437         
438         SaveMsg->MsgGUID = guid;
439         SaveMsg->Message = Message;
440         SaveMsg->Msg = msg;
441
442         n = GetCount(Cfg->Messages) + 1;
443         Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
444 }
445
446
447
448 /*
449  * Begin a feed parse
450  */
451 int rss_do_fetching(rss_aggregator *Cfg)
452 {
453         rss_item *ri;
454                 
455         time_t now;
456         AsyncIO *IO;
457
458         now = time(NULL);
459
460         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
461                 return 0;
462
463         ri = (rss_item*) malloc(sizeof(rss_item));
464         memset(ri, 0, sizeof(rss_item));
465         Cfg->Item = ri;
466         IO = &Cfg->IO;
467         IO->CitContext = CloneContext(&rss_CC);
468         IO->Data = Cfg;
469
470
471         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
472         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
473         CurlPrepareURL(IO->ConnectMe);
474
475         if (! evcurl_init(IO, 
476 //                        Ctx, 
477                           NULL,
478                           "Citadel RSS Client",
479                           ParseRSSReply, 
480                           RSSAggregatorTerminate,
481                           RSSAggregatorShutdownAbort))
482         {
483                 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
484                 return 0;
485         }
486
487         QueueCurlContext(IO);
488         return 1;
489 }
490
491
492 void DeleteRssCfg(void *vptr)
493 {
494         rss_aggregator *rncptr = (rss_aggregator *)vptr;
495         AsyncIO *IO = &rncptr->IO;
496         EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
497
498         FreeStrBuf(&rncptr->Url);
499         FreeStrBuf(&rncptr->rooms);
500         FreeStrBuf(&rncptr->CData);
501         FreeStrBuf(&rncptr->Key);
502         FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
503         DeleteHash(&rncptr->OtherQRnumbers);
504         FreeURL(&rncptr->IO.ConnectMe);
505
506         DeleteHashPos (&rncptr->Pos);
507         DeleteHash (&rncptr->Messages);
508         if (rncptr->recp.recp_room != NULL)
509                 free(rncptr->recp.recp_room);
510
511
512         if (rncptr->Item != NULL)
513         {
514                 FreeStrBuf(&rncptr->Item->guid);
515                 FreeStrBuf(&rncptr->Item->title);
516                 FreeStrBuf(&rncptr->Item->link);
517                 FreeStrBuf(&rncptr->Item->linkTitle);
518                 FreeStrBuf(&rncptr->Item->reLink);
519                 FreeStrBuf(&rncptr->Item->reLinkTitle);
520                 FreeStrBuf(&rncptr->Item->description);
521                 FreeStrBuf(&rncptr->Item->channel_title);
522                 FreeStrBuf(&rncptr->Item->author_or_creator);
523                 FreeStrBuf(&rncptr->Item->author_url);
524                 FreeStrBuf(&rncptr->Item->author_email);
525
526                 free(rncptr->Item);
527         }
528         free(rncptr);
529 }
530
531 eNextState RSSAggregatorTerminate(AsyncIO *IO)
532 {
533         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
534
535         EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
536
537
538         UnlinkRSSAggregator(rncptr);
539         return eAbort;
540 }
541 eNextState RSSAggregatorShutdownAbort(AsyncIO *IO)
542 {
543         const char *pUrl;
544         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
545
546         pUrl = IO->ConnectMe->PlainUrl;
547         if (pUrl == NULL)
548                 pUrl = "";
549
550         EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
551
552
553         UnlinkRSSAggregator(rncptr);
554         return eAbort;
555 }
556
557 /*
558  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
559  */
560 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
561 {
562         StrBuf *CfgData=NULL;
563         StrBuf *CfgType;
564         StrBuf *Line;
565         rss_room_counter *Count = NULL;
566         struct stat statbuf;
567         char filename[PATH_MAX];
568         int  fd;
569         int Done;
570         rss_aggregator *rncptr = NULL;
571         rss_aggregator *use_this_rncptr = NULL;
572         void *vptr;
573         const char *CfgPtr, *lPtr;
574         const char *Err;
575
576         pthread_mutex_lock(&RSSQueueMutex);
577         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
578         {
579                 syslog(LOG_DEBUG, 
580                               "rssclient: [%ld] %s already in progress.\n", 
581                               qrbuf->QRnumber, 
582                               qrbuf->QRname);
583                 pthread_mutex_unlock(&RSSQueueMutex);
584                 return;
585         }
586         pthread_mutex_unlock(&RSSQueueMutex);
587
588         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
589
590         if (server_shutting_down)
591                 return;
592                 
593         /* Only do net processing for rooms that have netconfigs */
594         fd = open(filename, 0);
595         if (fd <= 0) {
596                 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
597                 return;
598         }
599
600         if (server_shutting_down)
601                 return;
602
603         if (fstat(fd, &statbuf) == -1) {
604                 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
605                        filename, strerror(errno));
606                 return;
607         }
608
609         if (server_shutting_down)
610                 return;
611
612         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
613
614         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
615                 close(fd);
616                 FreeStrBuf(&CfgData);
617                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
618                         filename, strerror(errno));
619                 return;
620         }
621         close(fd);
622         if (server_shutting_down)
623                 return;
624         
625         CfgPtr = NULL;
626         CfgType = NewStrBuf();
627         Line = NewStrBufPlain(NULL, StrLength(CfgData));
628         Done = 0;
629         while (!Done)
630         {
631             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
632             if (StrLength(Line) > 0)
633             {
634                 lPtr = NULL;
635                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
636                 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
637                 {
638                     if (Count == NULL)
639                     {
640                         Count = malloc(sizeof(rss_room_counter));
641                         Count->count = 0;
642                     }
643                     Count->count ++;
644                     rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
645                     memset (rncptr, 0, sizeof(rss_aggregator));
646                     rncptr->roomlist_parts = 1;
647                     rncptr->Url = NewStrBuf();
648                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
649
650                     pthread_mutex_lock(&RSSQueueMutex);
651                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
652                     use_this_rncptr = (rss_aggregator *)vptr;
653                     if (use_this_rncptr != NULL)
654                     {
655                             long *QRnumber;
656                             StrBufAppendBufPlain(use_this_rncptr->rooms, 
657                                                  qrbuf->QRname, 
658                                                  -1, 0);
659                             if (use_this_rncptr->roomlist_parts == 1)
660                             {
661                                     use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
662                             }
663                             QRnumber = (long*)malloc(sizeof(long));
664                             *QRnumber = qrbuf->QRnumber;
665                             Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
666                             use_this_rncptr->roomlist_parts++;
667
668                             pthread_mutex_unlock(&RSSQueueMutex);
669
670                             FreeStrBuf(&rncptr->Url);
671                             free(rncptr);
672                             rncptr = NULL;
673                             continue;
674                     }
675                     pthread_mutex_unlock(&RSSQueueMutex);
676
677                     rncptr->ItemType = RSS_UNSET;
678                                 
679                     rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
680
681                     pthread_mutex_lock(&RSSQueueMutex);
682                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
683                     pthread_mutex_unlock(&RSSQueueMutex);
684                 }
685             }
686         }
687         if (Count != NULL)
688         {
689                 Count->QRnumber = qrbuf->QRnumber;
690                 pthread_mutex_lock(&RSSQueueMutex);
691                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", 
692                               qrbuf->QRnumber, qrbuf->QRname);
693                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
694                 pthread_mutex_unlock(&RSSQueueMutex);
695         }
696         FreeStrBuf(&CfgData);
697         FreeStrBuf(&CfgType);
698         FreeStrBuf(&Line);
699 }
700
701 /*
702  * Scan for rooms that have RSS client requests configured
703  */
704 void rssclient_scan(void) {
705         static int doing_rssclient = 0;
706         rss_aggregator *rptr = NULL;
707         void *vrptr = NULL;
708         HashPos  *it;
709         long len;
710         const char *Key;
711
712         /* Run no more than once every 15 minutes. */
713         if ((time(NULL) - last_run) < 900) {
714                 return;
715         }
716
717         /*
718          * This is a simple concurrency check to make sure only one rssclient run
719          * is done at a time.  We could do this with a mutex, but since we
720          * don't really require extremely fine granularity here, we'll do it
721          * with a static variable instead.
722          */
723         if (doing_rssclient) return;
724         doing_rssclient = 1;
725         if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
726                 return;
727
728         syslog(LOG_DEBUG, "rssclient started\n");
729         CtdlForEachRoom(rssclient_scan_room, NULL);
730
731         pthread_mutex_lock(&RSSQueueMutex);
732
733         it = GetNewHashPos(RSSFetchUrls, 0);
734         while (!server_shutting_down &&
735                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
736                (vrptr != NULL)) {
737                 rptr = (rss_aggregator *)vrptr;
738                 if (!rss_do_fetching(rptr))
739                         UnlinkRSSAggregator(rptr);
740         }
741         DeleteHashPos(&it);
742         pthread_mutex_unlock(&RSSQueueMutex);
743
744         syslog(LOG_DEBUG, "rssclient ended\n");
745         doing_rssclient = 0;
746         return;
747 }
748
749 void rss_cleanup(void)
750 {
751         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
752         DeleteHash(&RSSFetchUrls);
753         DeleteHash(&RSSQueueRooms);
754 }
755
756
757 CTDL_MODULE_INIT(rssclient)
758 {
759         if (threading)
760         {
761                 CtdlFillSystemContext(&rss_CC, "rssclient");
762                 pthread_mutex_init(&RSSQueueMutex, NULL);
763                 RSSQueueRooms = NewHash(1, lFlathash);
764                 RSSFetchUrls = NewHash(1, NULL);
765                 syslog(LOG_INFO, "%s\n", curl_version());
766                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
767                 CtdlRegisterCleanupHook(rss_cleanup);
768         }
769         return "rssclient";
770 }