Work on RSS Feed
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
70
71 eNextState RSSAggregatorTerminate(AsyncIO *IO);
72
73 struct CitContext rss_CC;
74
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
77 {
78         if (StrLength(link) > 0)
79         {
80                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
81                 StrBufAppendBuf(Message, link, 0);
82                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
83                 if (StrLength(LinkTitle) > 0)
84                         StrBufAppendBuf(Message, LinkTitle, 0);
85                 else if ((Title != NULL) && !IsEmptyStr(Title))
86                         StrBufAppendBufPlain(Message, Title, -1, 0);
87                 else
88                         StrBufAppendBuf(Message, link, 0);
89                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
90         }
91 }
92
93
94 void DeleteRoomReference(long QRnumber)
95 {
96         HashPos *At;
97         long HKLen;
98         const char *HK;
99         void *vData = NULL;
100         rss_room_counter *pRoomC;
101
102         At = GetNewHashPos(RSSQueueRooms, 0);
103
104         GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At);
105         GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
106         if (vData != NULL)
107         {
108                 pRoomC = (rss_room_counter *) vData;
109                 pRoomC->count --;
110                 if (pRoomC->count == 0)
111                         DeleteEntryFromHash(RSSQueueRooms, At);
112         }
113         DeleteHashPos(&At);
114 }
115
116 void UnlinkRooms(rss_aggregator *Cfg)
117 {
118         
119         DeleteRoomReference(Cfg->QRnumber);
120         if (Cfg->OtherQRnumbers != NULL)
121         {
122                 long HKLen;
123                 const char *HK;
124                 HashPos *At;
125                 void *vData;
126
127                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
128                 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && 
129                        (vData != NULL))
130                 {
131                         long *lData = (long*) vData;
132                         DeleteRoomReference(*lData);
133                 }
134 /*
135                 if (server_shutting_down)
136                         break; / * TODO */
137
138                 DeleteHashPos(&At);
139         }
140 }
141
142 void UnlinkRSSAggregator(rss_aggregator *Cfg)
143 {
144         HashPos *At;
145
146         UnlinkRooms(Cfg);
147
148         At = GetNewHashPos(RSSFetchUrls, 0);
149         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At) == 0)
150         {
151                 DeleteEntryFromHash(RSSFetchUrls, At);
152         }
153         DeleteHashPos(&At);
154         last_run = time(NULL);
155 }
156 /*
157 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
158 {
159         networker_save_message *Ctx = (networker_save_message *) IO->Data;
160
161         pthread_mutex_lock(&RSSQueueMutex);
162         Ctx->Cfg->RefCount --;
163
164         if (Ctx->Cfg->RefCount == 0)
165         {
166                 UnlinkRSSAggregator(Ctx->Cfg);
167
168         }
169         pthread_mutex_unlock(&RSSQueueMutex);
170
171         CtdlFreeMessage(Ctx->Msg);
172         free_recipients(Ctx->recp);
173         FreeStrBuf(&Ctx->Message);
174         FreeStrBuf(&Ctx->MsgGUID);
175         ((struct CitContext*)IO->CitContext)->state = CON_IDLE;
176         ((struct CitContext*)IO->CitContext)->kill_me = 1;
177         free(Ctx);
178         last_run = time(NULL);
179         return eAbort;
180 }
181 */
182 void FreeNetworkSaveMessage (void *vMsg)
183 {
184         networker_save_message *Msg = (networker_save_message *) vMsg;
185
186         CtdlFreeMessage(Msg->Msg);
187         FreeStrBuf(&Msg->Message);
188         FreeStrBuf(&Msg->MsgGUID);
189         free(Msg);
190 }
191
192 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
193 {
194         return eAbort; ///TODO
195 }
196
197 eNextState RSSSaveMessage(AsyncIO *IO)
198 {
199         long len;
200         const char *Key;
201         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
202
203         Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
204
205         CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
206
207         /* write the uidl to the use table so we don't store this item again */
208         cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
209         
210         if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
211                 return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
212         else
213                 return eAbort;
214 }
215
216 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
217 {
218         const char *Key;
219         long len;
220         struct cdbdata *cdbut;
221         rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
222
223
224         /* Find out if we've already seen this item */
225         strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
226         Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
227
228         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
229 #ifndef DEBUG_RSS
230         if (cdbut != NULL) {
231                 /* Item has already been seen */
232                 syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
233                 cdb_free(cdbut);
234
235                 /* rewrite the record anyway, to update the timestamp */
236                 cdb_store(CDB_USETABLE, 
237                           SKEY(Ctx->ThisMsg->MsgGUID), 
238                           &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
239
240                 if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
241                         return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
242                 else
243                         return eAbort;
244         }
245         else
246 #endif
247         {
248                 NextDBOperation(IO, RSSSaveMessage);
249                 return eSendMore;
250         }
251 }
252 /*
253 void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg)
254 {
255         networker_save_message *Ctx;
256
257         pthread_mutex_lock(&RSSQueueMutex);
258         Cfg->RefCount ++;
259         pthread_mutex_unlock(&RSSQueueMutex);
260
261
262         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
263         memset(Ctx, 0, sizeof(networker_save_message));
264         
265         Ctx->MsgGUID = MsgGUID;
266         Ctx->Message = MessageBody;
267         Ctx->Msg = Msg;
268         Ctx->Cfg = Cfg;
269         Ctx->recp = recp;
270         Ctx->IO.Data = Ctx;
271         Ctx->IO.CitContext = CloneContext(&rss_CC);
272         Ctx->IO.Terminate = FreeNetworkSaveMessage;
273         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
274         QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
275 }
276 */
277
278 /*
279  * Commit a fetched and parsed RSS item to disk
280  */
281 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
282 {
283
284         struct MD5Context md5context;
285         u_char rawdigest[MD5_DIGEST_LEN];
286         struct CtdlMessage *msg;
287         int msglen = 0;
288         StrBuf *Message;
289         StrBuf *guid;
290
291         int n;
292    
293         /* Construct a GUID to use in the S_USETABLE table.
294          * If one is not present in the item itself, make one up.
295          */
296         if (ri->guid != NULL) {
297                 StrBufSpaceToBlank(ri->guid);
298                 StrBufTrim(ri->guid);
299                 guid = NewStrBufPlain(HKEY("rss/"));
300                 StrBufAppendBuf(guid, ri->guid, 0);
301         }
302         else {
303                 MD5Init(&md5context);
304                 if (ri->title != NULL) {
305                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
306                 }
307                 if (ri->link != NULL) {
308                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
309                 }
310                 MD5Final(rawdigest, &md5context);
311                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
312                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
313                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
314         }
315
316         /* translate Item into message. */
317         syslog(LOG_DEBUG, "RSS: translating item...\n");
318         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
319         StrBufSpaceToBlank(ri->description);
320         msg = malloc(sizeof(struct CtdlMessage));
321         memset(msg, 0, sizeof(struct CtdlMessage));
322         msg->cm_magic = CTDLMESSAGE_MAGIC;
323         msg->cm_anon_type = MES_NORMAL;
324         msg->cm_format_type = FMT_RFC822;
325
326         if (ri->guid != NULL) {
327                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
328         }
329
330         if (ri->author_or_creator != NULL) {
331                 char *From;
332                 StrBuf *Encoded = NULL;
333                 int FromAt;
334                         
335                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
336                                      StrLength(ri->author_or_creator), 
337                                      512, 0);
338                 StrBufPlain(ri->author_or_creator, From, -1);
339                 StrBufTrim(ri->author_or_creator);
340                 free(From);
341
342                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
343                 if (!FromAt && StrLength (ri->author_email) > 0)
344                 {
345                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
346                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
347                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
348                 }
349                 else
350                 {
351                         if (FromAt)
352                         {
353                                 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
354                                 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
355                         }
356                         else 
357                         {
358                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
359                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
360                                 msg->cm_fields['P'] = strdup("rss@localhost");
361
362                         }
363                         if (ri->pubdate <= 0) {
364                                 ri->pubdate = time(NULL);
365                         }
366                 }
367         }
368         else {
369                 msg->cm_fields['A'] = strdup("rss");
370         }
371
372         msg->cm_fields['N'] = strdup(NODENAME);
373         if (ri->title != NULL) {
374                 long len;
375                 char *Sbj;
376                 StrBuf *Encoded, *QPEncoded;
377
378                 QPEncoded = NULL;
379                 StrBufSpaceToBlank(ri->title);
380                 len = StrLength(ri->title);
381                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
382                 len = strlen(Sbj);
383                 if (Sbj[len - 1] == '\n')
384                 {
385                         len --;
386                         Sbj[len] = '\0';
387                 }
388                 Encoded = NewStrBufPlain(Sbj, len);
389                 free(Sbj);
390
391                 StrBufTrim(Encoded);
392                 StrBufRFC2047encode(&QPEncoded, Encoded);
393
394                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
395                 FreeStrBuf(&Encoded);
396         }
397         msg->cm_fields['T'] = malloc(64);
398         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
399         if (ri->channel_title != NULL) {
400                 if (StrLength(ri->channel_title) > 0) {
401                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
402                 }
403         }
404         if (ri->link == NULL) 
405                 ri->link = NewStrBufPlain(HKEY(""));
406
407 #if 0 /* temporarily disable shorter urls. */
408         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
409 #endif
410
411         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
412
413         Message = NewStrBufPlain(NULL, StrLength(ri->description));
414
415         StrBufPlain(Message, HKEY(
416                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
417                             "<html><body>\n"));
418 #if 0 /* disable shorter url for now. */
419         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
420 #endif
421         StrBufAppendBuf(Message, ri->description, 0);
422         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
423
424         AppendLink(Message, ri->link, ri->linkTitle, NULL);
425         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
426         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
427
428
429
430         networker_save_message *SaveMsg;
431
432         SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message));
433         memset(SaveMsg, 0, sizeof(networker_save_message));
434         
435         SaveMsg->MsgGUID = guid;
436         SaveMsg->Message = Message;
437         SaveMsg->Msg = msg;
438
439         n = GetCount(Cfg->Messages) + 1;
440         Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
441 }
442
443
444
445 /*
446  * Begin a feed parse
447  */
448 int rss_do_fetching(rss_aggregator *Cfg)
449 {
450         rss_item *ri;
451                 
452         time_t now;
453         AsyncIO *IO;
454
455         now = time(NULL);
456
457         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
458                 return 0;
459         Cfg->RefCount++;
460
461         ri = (rss_item*) malloc(sizeof(rss_item));
462         memset(ri, 0, sizeof(rss_item));
463         Cfg->Item = ri;
464         IO = &Cfg->IO;
465         IO->CitContext = CloneContext(&rss_CC);
466         IO->Data = Cfg;
467
468
469         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
470         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
471         CurlPrepareURL(IO->ConnectMe);
472
473         if (! evcurl_init(IO, 
474 //                        Ctx, 
475                           NULL,
476                           "Citadel RSS Client",
477                           ParseRSSReply, 
478                           RSSAggregatorTerminate))
479         {
480                 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
481                 return 0;
482         }
483
484         QueueCurlContext(IO);
485         return 1;
486 }
487
488
489 void DeleteRssCfg(void *vptr)
490 {
491         rss_aggregator *rncptr = (rss_aggregator *)vptr;
492
493         FreeStrBuf(&rncptr->Url);
494         FreeStrBuf(&rncptr->rooms);
495         FreeStrBuf(&rncptr->CData);
496         FreeStrBuf(&rncptr->Key);
497         FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
498         DeleteHash(&rncptr->OtherQRnumbers);
499         FreeURL(&rncptr->IO.ConnectMe);
500
501         if (rncptr->Item != NULL)
502         {
503                 FreeStrBuf(&rncptr->Item->guid);
504                 FreeStrBuf(&rncptr->Item->title);
505                 FreeStrBuf(&rncptr->Item->link);
506                 FreeStrBuf(&rncptr->Item->linkTitle);
507                 FreeStrBuf(&rncptr->Item->reLink);
508                 FreeStrBuf(&rncptr->Item->reLinkTitle);
509                 FreeStrBuf(&rncptr->Item->description);
510                 FreeStrBuf(&rncptr->Item->channel_title);
511                 FreeStrBuf(&rncptr->Item->author_or_creator);
512                 FreeStrBuf(&rncptr->Item->author_url);
513                 FreeStrBuf(&rncptr->Item->author_email);
514
515                 free(rncptr->Item);
516         }
517         free(rncptr);
518 }
519
520 eNextState RSSAggregatorTerminate(AsyncIO *IO)
521 {
522         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
523         
524         HashPos *At;
525         long HKLen;
526         const char *HK;
527         void *vData;
528
529         pthread_mutex_lock(&RSSQueueMutex);
530         rncptr->RefCount --;
531         if (rncptr->RefCount == 0)
532         {
533                 UnlinkRSSAggregator(rncptr);
534
535         }
536         pthread_mutex_unlock(&RSSQueueMutex);
537
538         At = GetNewHashPos(RSSFetchUrls, 0);
539
540         pthread_mutex_lock(&RSSQueueMutex);
541         GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
542         GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
543         DeleteEntryFromHash(RSSFetchUrls, At);
544         pthread_mutex_unlock(&RSSQueueMutex);
545         DeleteHashPos (&rncptr->Pos);
546         DeleteHash (&rncptr->Messages);
547         if (rncptr->recp.recp_room != NULL)
548                 free(rncptr->recp.recp_room);
549         DeleteHashPos(&At);
550         return eAbort;
551 }
552
553 /*
554  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
555  */
556 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
557 {
558         StrBuf *CfgData=NULL;
559         StrBuf *CfgType;
560         StrBuf *Line;
561         rss_room_counter *Count = NULL;
562         struct stat statbuf;
563         char filename[PATH_MAX];
564         int  fd;
565         int Done;
566         rss_aggregator *rncptr = NULL;
567         rss_aggregator *use_this_rncptr = NULL;
568         void *vptr;
569         const char *CfgPtr, *lPtr;
570         const char *Err;
571
572         pthread_mutex_lock(&RSSQueueMutex);
573         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
574         {
575                 syslog(LOG_DEBUG, 
576                               "rssclient: [%ld] %s already in progress.\n", 
577                               qrbuf->QRnumber, 
578                               qrbuf->QRname);
579                 pthread_mutex_unlock(&RSSQueueMutex);
580                 return;
581         }
582         pthread_mutex_unlock(&RSSQueueMutex);
583
584         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
585
586         if (server_shutting_down)
587                 return;
588                 
589         /* Only do net processing for rooms that have netconfigs */
590         fd = open(filename, 0);
591         if (fd <= 0) {
592                 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
593                 return;
594         }
595
596         if (server_shutting_down)
597                 return;
598
599         if (fstat(fd, &statbuf) == -1) {
600                 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
601                        filename, strerror(errno));
602                 return;
603         }
604
605         if (server_shutting_down)
606                 return;
607
608         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
609
610         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
611                 close(fd);
612                 FreeStrBuf(&CfgData);
613                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
614                         filename, strerror(errno));
615                 return;
616         }
617         close(fd);
618         if (server_shutting_down)
619                 return;
620         
621         CfgPtr = NULL;
622         CfgType = NewStrBuf();
623         Line = NewStrBufPlain(NULL, StrLength(CfgData));
624         Done = 0;
625         while (!Done)
626         {
627             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
628             if (StrLength(Line) > 0)
629             {
630                 lPtr = NULL;
631                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
632                 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
633                 {
634                     if (Count == NULL)
635                     {
636                         Count = malloc(sizeof(rss_room_counter));
637                         Count->count = 0;
638                     }
639                     Count->count ++;
640                     rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
641                     memset (rncptr, 0, sizeof(rss_aggregator));
642                     rncptr->roomlist_parts = 1;
643                     rncptr->Url = NewStrBuf();
644                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
645
646                     pthread_mutex_lock(&RSSQueueMutex);
647                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
648                     use_this_rncptr = (rss_aggregator *)vptr;
649                     if (use_this_rncptr != NULL)
650                     {
651                             /* mustn't attach to an active session */
652                             if (use_this_rncptr->RefCount > 0)
653                             {
654                                     DeleteRssCfg(rncptr);
655                                     Count->count--;
656                             }
657                             else 
658                             {
659                                     long *QRnumber;
660                                     StrBufAppendBufPlain(use_this_rncptr->rooms, 
661                                                          qrbuf->QRname, 
662                                                          -1, 0);
663                                     if (use_this_rncptr->roomlist_parts == 1)
664                                     {
665                                             use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
666                                     }
667                                     QRnumber = (long*)malloc(sizeof(long));
668                                     *QRnumber = qrbuf->QRnumber;
669                                     Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
670                                     use_this_rncptr->roomlist_parts++;
671                             }
672                             pthread_mutex_unlock(&RSSQueueMutex);
673
674
675                             FreeStrBuf(&rncptr->Url);
676                             free(rncptr);
677                             rncptr = NULL;
678                             continue;
679                     }
680                     pthread_mutex_unlock(&RSSQueueMutex);
681
682                     rncptr->ItemType = RSS_UNSET;
683                                 
684                     rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
685
686                     pthread_mutex_lock(&RSSQueueMutex);
687                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
688                     pthread_mutex_unlock(&RSSQueueMutex);
689                 }
690             }
691         }
692         if (Count != NULL)
693         {
694                 Count->QRnumber = qrbuf->QRnumber;
695                 pthread_mutex_lock(&RSSQueueMutex);
696                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", 
697                               qrbuf->QRnumber, qrbuf->QRname);
698                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
699                 pthread_mutex_unlock(&RSSQueueMutex);
700         }
701         FreeStrBuf(&CfgData);
702         FreeStrBuf(&CfgType);
703         FreeStrBuf(&Line);
704 }
705
706 /*
707  * Scan for rooms that have RSS client requests configured
708  */
709 void rssclient_scan(void) {
710         static int doing_rssclient = 0;
711         rss_aggregator *rptr = NULL;
712         void *vrptr = NULL;
713         HashPos  *it;
714         long len;
715         const char *Key;
716
717         /* Run no more than once every 15 minutes. */
718         if ((time(NULL) - last_run) < 900) {
719                 return;
720         }
721
722         /*
723          * This is a simple concurrency check to make sure only one rssclient run
724          * is done at a time.  We could do this with a mutex, but since we
725          * don't really require extremely fine granularity here, we'll do it
726          * with a static variable instead.
727          */
728         if (doing_rssclient) return;
729         doing_rssclient = 1;
730
731         syslog(LOG_DEBUG, "rssclient started\n");
732         CtdlForEachRoom(rssclient_scan_room, NULL);
733
734         pthread_mutex_lock(&RSSQueueMutex);
735
736         it = GetNewHashPos(RSSFetchUrls, 0);
737         while (!server_shutting_down &&
738                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
739                (vrptr != NULL)) {
740                 rptr = (rss_aggregator *)vrptr;
741                 if (rptr->RefCount == 0) 
742                         if (!rss_do_fetching(rptr))
743                                 UnlinkRSSAggregator(rptr);
744         }
745         DeleteHashPos(&it);
746         pthread_mutex_unlock(&RSSQueueMutex);
747
748         syslog(LOG_DEBUG, "rssclient ended\n");
749         doing_rssclient = 0;
750         return;
751 }
752
753 void rss_cleanup(void)
754 {
755         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
756         DeleteHash(&RSSFetchUrls);
757         DeleteHash(&RSSQueueRooms);
758 }
759
760
761 CTDL_MODULE_INIT(rssclient)
762 {
763         if (threading)
764         {
765                 CtdlFillSystemContext(&rss_CC, "rssclient");
766                 pthread_mutex_init(&RSSQueueMutex, NULL);
767                 RSSQueueRooms = NewHash(1, lFlathash);
768                 RSSFetchUrls = NewHash(1, NULL);
769                 syslog(LOG_INFO, "%s\n", curl_version());
770                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
771                 CtdlRegisterCleanupHook(rss_cleanup);
772         }
773         return "rssclient";
774 }