finalize RSS/curlev implementation
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 time_t last_run = 0L;
66
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
70
71 eNextState RSSAggregatorTerminate(AsyncIO *IO);
72
73 struct CitContext rss_CC;
74
75 struct rssnetcfg *rnclist = NULL;
76 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
77 {
78         if (StrLength(link) > 0)
79         {
80                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
81                 StrBufAppendBuf(Message, link, 0);
82                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
83                 if (StrLength(LinkTitle) > 0)
84                         StrBufAppendBuf(Message, LinkTitle, 0);
85                 else if ((Title != NULL) && !IsEmptyStr(Title))
86                         StrBufAppendBufPlain(Message, Title, -1, 0);
87                 else
88                         StrBufAppendBuf(Message, link, 0);
89                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
90         }
91 }
92 typedef struct __networker_save_message {
93         AsyncIO IO;
94         struct CtdlMessage *Msg;
95         struct recptypes *recp;
96         rss_aggregator *Cfg;
97         StrBuf *MsgGUID;
98         StrBuf *Message;
99         struct UseTable ut;
100 } networker_save_message;
101
102
103 void DeleteRoomReference(long QRnumber)
104 {
105         HashPos *At;
106         long HKLen;
107         const char *HK;
108         void *vData = NULL;
109         rss_room_counter *pRoomC;
110
111         At = GetNewHashPos(RSSQueueRooms, 0);
112
113         GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At);
114         GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
115         if (vData != NULL)
116         {
117                 pRoomC = (rss_room_counter *) vData;
118                 pRoomC->count --;
119                 if (pRoomC->count == 0)
120                         DeleteEntryFromHash(RSSQueueRooms, At);
121         }
122         DeleteHashPos(&At);
123 }
124
125 void UnlinkRooms(rss_aggregator *Cfg)
126 {
127         
128         DeleteRoomReference(Cfg->QRnumber);
129         if (Cfg->OtherQRnumbers != NULL)
130         {
131                 long HKLen;
132                 const char *HK;
133                 HashPos *At;
134                 void *vData;
135
136                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
137                 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && 
138                        (vData != NULL))
139                 {
140                         long *lData = (long*) vData;
141                         DeleteRoomReference(*lData);
142                 }
143 /*
144                 if (server_shutting_down)
145                         break; / * TODO */
146
147                 DeleteHashPos(&At);
148         }
149 }
150
151 void UnlinkRSSAggregator(rss_aggregator *Cfg)
152 {
153         HashPos *At;
154
155         UnlinkRooms(Cfg);
156
157         At = GetNewHashPos(RSSFetchUrls, 0);
158         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At) == 0)
159         {
160                 DeleteEntryFromHash(RSSFetchUrls, At);
161         }
162         DeleteHashPos(&At);
163         last_run = time(NULL);
164 }
165
166 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
167 {
168         networker_save_message *Ctx = (networker_save_message *) IO->Data;
169
170         pthread_mutex_lock(&RSSQueueMutex);
171         Ctx->Cfg->RefCount --;
172
173         if (Ctx->Cfg->RefCount == 0)
174         {
175                 UnlinkRSSAggregator(Ctx->Cfg);
176
177         }
178         pthread_mutex_unlock(&RSSQueueMutex);
179
180         CtdlFreeMessage(Ctx->Msg);
181         free_recipients(Ctx->recp);
182         FreeStrBuf(&Ctx->Message);
183         FreeStrBuf(&Ctx->MsgGUID);
184         ((struct CitContext*)IO->CitContext)->state = CON_IDLE;
185         ((struct CitContext*)IO->CitContext)->kill_me = 1;
186         free(Ctx);
187         last_run = time(NULL);
188         return eAbort;
189 }
190
191 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
192 {
193         return eAbort; ///TODO
194 }
195
196 eNextState RSSSaveMessage(AsyncIO *IO)
197 {
198         networker_save_message *Ctx = (networker_save_message *) IO->Data;
199
200         Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
201
202         CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
203
204         /* write the uidl to the use table so we don't store this item again */
205         cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
206
207         return eTerminateConnection;
208 }
209
210 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
211 {
212         struct cdbdata *cdbut;
213         networker_save_message *Ctx = (networker_save_message *) IO->Data;
214
215         /* Find out if we've already seen this item */
216         strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
217         Ctx->ut.ut_timestamp = time(NULL);
218
219         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
220 #ifndef DEBUG_RSS
221         if (cdbut != NULL) {
222                 /* Item has already been seen */
223                 syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
224                 cdb_free(cdbut);
225
226                 /* rewrite the record anyway, to update the timestamp */
227                 cdb_store(CDB_USETABLE, 
228                           SKEY(Ctx->MsgGUID), 
229                           &Ctx->ut, sizeof(struct UseTable) );
230                 return eAbort;
231         }
232         else
233 #endif
234         {
235                 NextDBOperation(IO, RSSSaveMessage);
236                 return eSendMore;
237         }
238 }
239 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg)
240 {
241         networker_save_message *Ctx;
242
243         pthread_mutex_lock(&RSSQueueMutex);
244         Cfg->RefCount ++;
245         pthread_mutex_unlock(&RSSQueueMutex);
246
247
248         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
249         memset(Ctx, 0, sizeof(networker_save_message));
250         
251         Ctx->MsgGUID = MsgGUID;
252         Ctx->Message = MessageBody;
253         Ctx->Msg = Msg;
254         Ctx->Cfg = Cfg;
255         Ctx->recp = recp;
256         Ctx->IO.Data = Ctx;
257         Ctx->IO.CitContext = CloneContext(&rss_CC);
258         Ctx->IO.Terminate = FreeNetworkSaveMessage;
259         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
260         QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
261 }
262
263
264 /*
265  * Commit a fetched and parsed RSS item to disk
266  */
267 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
268 {
269
270         struct MD5Context md5context;
271         u_char rawdigest[MD5_DIGEST_LEN];
272         struct CtdlMessage *msg;
273         struct recptypes *recp = NULL;
274         int msglen = 0;
275         StrBuf *Message;
276         StrBuf *guid;
277         StrBuf *Buf;
278
279         recp = (struct recptypes *) malloc(sizeof(struct recptypes));
280         if (recp == NULL) return;
281         memset(recp, 0, sizeof(struct recptypes));
282         Buf = NewStrBufDup(Cfg->rooms);
283         recp->recp_room = SmashStrBuf(&Buf);
284         recp->num_room = Cfg->roomlist_parts;
285         recp->recptypes_magic = RECPTYPES_MAGIC;
286    
287         Cfg->RefCount ++;
288         /* Construct a GUID to use in the S_USETABLE table.
289          * If one is not present in the item itself, make one up.
290          */
291         if (ri->guid != NULL) {
292                 StrBufSpaceToBlank(ri->guid);
293                 StrBufTrim(ri->guid);
294                 guid = NewStrBufPlain(HKEY("rss/"));
295                 StrBufAppendBuf(guid, ri->guid, 0);
296         }
297         else {
298                 MD5Init(&md5context);
299                 if (ri->title != NULL) {
300                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
301                 }
302                 if (ri->link != NULL) {
303                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
304                 }
305                 MD5Final(rawdigest, &md5context);
306                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
307                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
308                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
309         }
310
311         /* translate Item into message. */
312         syslog(LOG_DEBUG, "RSS: translating item...\n");
313         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
314         StrBufSpaceToBlank(ri->description);
315         msg = malloc(sizeof(struct CtdlMessage));
316         memset(msg, 0, sizeof(struct CtdlMessage));
317         msg->cm_magic = CTDLMESSAGE_MAGIC;
318         msg->cm_anon_type = MES_NORMAL;
319         msg->cm_format_type = FMT_RFC822;
320
321         if (ri->guid != NULL) {
322                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
323         }
324
325         if (ri->author_or_creator != NULL) {
326                 char *From;
327                 StrBuf *Encoded = NULL;
328                 int FromAt;
329                         
330                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
331                                      StrLength(ri->author_or_creator), 
332                                      512, 0);
333                 StrBufPlain(ri->author_or_creator, From, -1);
334                 StrBufTrim(ri->author_or_creator);
335                 free(From);
336
337                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
338                 if (!FromAt && StrLength (ri->author_email) > 0)
339                 {
340                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
341                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
342                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
343                 }
344                 else
345                 {
346                         if (FromAt)
347                         {
348                                 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
349                                 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
350                         }
351                         else 
352                         {
353                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
354                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
355                                 msg->cm_fields['P'] = strdup("rss@localhost");
356
357                         }
358                         if (ri->pubdate <= 0) {
359                                 ri->pubdate = time(NULL);
360                         }
361                 }
362         }
363         else {
364                 msg->cm_fields['A'] = strdup("rss");
365         }
366
367         msg->cm_fields['N'] = strdup(NODENAME);
368         if (ri->title != NULL) {
369                 long len;
370                 char *Sbj;
371                 StrBuf *Encoded, *QPEncoded;
372
373                 QPEncoded = NULL;
374                 StrBufSpaceToBlank(ri->title);
375                 len = StrLength(ri->title);
376                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
377                 len = strlen(Sbj);
378                 if (Sbj[len - 1] == '\n')
379                 {
380                         len --;
381                         Sbj[len] = '\0';
382                 }
383                 Encoded = NewStrBufPlain(Sbj, len);
384                 free(Sbj);
385
386                 StrBufTrim(Encoded);
387                 StrBufRFC2047encode(&QPEncoded, Encoded);
388
389                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
390                 FreeStrBuf(&Encoded);
391         }
392         msg->cm_fields['T'] = malloc(64);
393         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
394         if (ri->channel_title != NULL) {
395                 if (StrLength(ri->channel_title) > 0) {
396                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
397                 }
398         }
399         if (ri->link == NULL) 
400                 ri->link = NewStrBufPlain(HKEY(""));
401
402 #if 0 /* temporarily disable shorter urls. */
403         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
404 #endif
405
406         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
407
408         Message = NewStrBufPlain(NULL, StrLength(ri->description));
409
410         StrBufPlain(Message, HKEY(
411                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
412                             "<html><body>\n"));
413 #if 0 /* disable shorter url for now. */
414         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
415 #endif
416         StrBufAppendBuf(Message, ri->description, 0);
417         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
418
419         AppendLink(Message, ri->link, ri->linkTitle, NULL);
420         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
421         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
422
423         RSSQueueSaveMessage(msg, recp, guid, Message, Cfg);
424 }
425
426
427
428 /*
429  * Begin a feed parse
430  */
431 int rss_do_fetching(rss_aggregator *Cfg)
432 {
433         rss_item *ri;
434                 
435         time_t now;
436         AsyncIO *IO;
437
438         now = time(NULL);
439
440         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
441                 return 0;
442         Cfg->RefCount++;
443
444         ri = (rss_item*) malloc(sizeof(rss_item));
445         memset(ri, 0, sizeof(rss_item));
446         Cfg->Item = ri;
447         IO = &Cfg->IO;
448         IO->CitContext = CloneContext(&rss_CC);
449         IO->Data = Cfg;
450
451
452         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
453         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
454         CurlPrepareURL(IO->ConnectMe);
455
456         if (! evcurl_init(IO, 
457 //                        Ctx, 
458                           NULL,
459                           "Citadel RSS Client",
460                           ParseRSSReply, 
461                           RSSAggregatorTerminate))
462         {
463                 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
464                 return 0;
465         }
466
467         QueueCurlContext(IO);
468         return 1;
469 }
470
471
472 void DeleteRssCfg(void *vptr)
473 {
474         rss_aggregator *rncptr = (rss_aggregator *)vptr;
475
476         FreeStrBuf(&rncptr->Url);
477         FreeStrBuf(&rncptr->rooms);
478         FreeStrBuf(&rncptr->CData);
479         FreeStrBuf(&rncptr->Key);
480         FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
481         DeleteHash(&rncptr->OtherQRnumbers);
482         FreeURL(&rncptr->IO.ConnectMe);
483
484         if (rncptr->Item != NULL)
485         {
486                 FreeStrBuf(&rncptr->Item->guid);
487                 FreeStrBuf(&rncptr->Item->title);
488                 FreeStrBuf(&rncptr->Item->link);
489                 FreeStrBuf(&rncptr->Item->linkTitle);
490                 FreeStrBuf(&rncptr->Item->reLink);
491                 FreeStrBuf(&rncptr->Item->reLinkTitle);
492                 FreeStrBuf(&rncptr->Item->description);
493                 FreeStrBuf(&rncptr->Item->channel_title);
494                 FreeStrBuf(&rncptr->Item->author_or_creator);
495                 FreeStrBuf(&rncptr->Item->author_url);
496                 FreeStrBuf(&rncptr->Item->author_email);
497
498                 free(rncptr->Item);
499         }
500         free(rncptr);
501 }
502
503 eNextState RSSAggregatorTerminate(AsyncIO *IO)
504 {
505         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
506         
507         HashPos *At;
508         long HKLen;
509         const char *HK;
510         void *vData;
511
512         pthread_mutex_lock(&RSSQueueMutex);
513         rncptr->RefCount --;
514         if (rncptr->RefCount == 0)
515         {
516                 UnlinkRSSAggregator(rncptr);
517
518         }
519         pthread_mutex_unlock(&RSSQueueMutex);
520
521         At = GetNewHashPos(RSSFetchUrls, 0);
522
523         pthread_mutex_lock(&RSSQueueMutex);
524         GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
525         GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
526         DeleteEntryFromHash(RSSFetchUrls, At);
527         pthread_mutex_unlock(&RSSQueueMutex);
528
529         DeleteHashPos(&At);
530         return eAbort;
531 }
532
533 /*
534  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
535  */
536 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
537 {
538         StrBuf *CfgData=NULL;
539         StrBuf *CfgType;
540         StrBuf *Line;
541         rss_room_counter *Count = NULL;
542         struct stat statbuf;
543         char filename[PATH_MAX];
544         int  fd;
545         int Done;
546         rss_aggregator *rncptr = NULL;
547         rss_aggregator *use_this_rncptr = NULL;
548         void *vptr;
549         const char *CfgPtr, *lPtr;
550         const char *Err;
551
552         pthread_mutex_lock(&RSSQueueMutex);
553         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
554         {
555                 syslog(LOG_DEBUG, 
556                               "rssclient: [%ld] %s already in progress.\n", 
557                               qrbuf->QRnumber, 
558                               qrbuf->QRname);
559                 pthread_mutex_unlock(&RSSQueueMutex);
560                 return;
561         }
562         pthread_mutex_unlock(&RSSQueueMutex);
563
564         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
565
566         if (server_shutting_down)
567                 return;
568                 
569         /* Only do net processing for rooms that have netconfigs */
570         fd = open(filename, 0);
571         if (fd <= 0) {
572                 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
573                 return;
574         }
575
576         if (server_shutting_down)
577                 return;
578
579         if (fstat(fd, &statbuf) == -1) {
580                 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
581                        filename, strerror(errno));
582                 return;
583         }
584
585         if (server_shutting_down)
586                 return;
587
588         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
589
590         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
591                 close(fd);
592                 FreeStrBuf(&CfgData);
593                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
594                         filename, strerror(errno));
595                 return;
596         }
597         close(fd);
598         if (server_shutting_down)
599                 return;
600         
601         CfgPtr = NULL;
602         CfgType = NewStrBuf();
603         Line = NewStrBufPlain(NULL, StrLength(CfgData));
604         Done = 0;
605         while (!Done)
606         {
607             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
608             if (StrLength(Line) > 0)
609             {
610                 lPtr = NULL;
611                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
612                 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
613                 {
614                     if (Count == NULL)
615                     {
616                         Count = malloc(sizeof(rss_room_counter));
617                         Count->count = 0;
618                     }
619                     Count->count ++;
620                     rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
621                     memset (rncptr, 0, sizeof(rss_aggregator));
622                     rncptr->roomlist_parts = 1;
623                     rncptr->Url = NewStrBuf();
624                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
625
626                     pthread_mutex_lock(&RSSQueueMutex);
627                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
628                     use_this_rncptr = (rss_aggregator *)vptr;
629                     if (use_this_rncptr != NULL)
630                     {
631                             /* mustn't attach to an active session */
632                             if (use_this_rncptr->RefCount > 0)
633                             {
634                                     DeleteRssCfg(rncptr);
635                                     Count->count--;
636                             }
637                             else 
638                             {
639                                     long *QRnumber;
640                                     StrBufAppendBufPlain(use_this_rncptr->rooms, 
641                                                          qrbuf->QRname, 
642                                                          -1, 0);
643                                     if (use_this_rncptr->roomlist_parts == 1)
644                                     {
645                                             use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
646                                     }
647                                     QRnumber = (long*)malloc(sizeof(long));
648                                     *QRnumber = qrbuf->QRnumber;
649                                     Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
650                                     use_this_rncptr->roomlist_parts++;
651                             }
652                             pthread_mutex_unlock(&RSSQueueMutex);
653
654
655                             FreeStrBuf(&rncptr->Url);
656                             free(rncptr);
657                             rncptr = NULL;
658                             continue;
659                     }
660                     pthread_mutex_unlock(&RSSQueueMutex);
661
662                     rncptr->ItemType = RSS_UNSET;
663                                 
664                     rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
665
666                     pthread_mutex_lock(&RSSQueueMutex);
667                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
668                     pthread_mutex_unlock(&RSSQueueMutex);
669                 }
670             }
671         }
672         if (Count != NULL)
673         {
674                 Count->QRnumber = qrbuf->QRnumber;
675                 pthread_mutex_lock(&RSSQueueMutex);
676                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", 
677                               qrbuf->QRnumber, qrbuf->QRname);
678                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
679                 pthread_mutex_unlock(&RSSQueueMutex);
680         }
681         FreeStrBuf(&CfgData);
682         FreeStrBuf(&CfgType);
683         FreeStrBuf(&Line);
684 }
685
686 /*
687  * Scan for rooms that have RSS client requests configured
688  */
689 void rssclient_scan(void) {
690         static int doing_rssclient = 0;
691         rss_aggregator *rptr = NULL;
692         void *vrptr = NULL;
693         HashPos  *it;
694         long len;
695         const char *Key;
696
697         /* Run no more than once every 15 minutes. */
698         if ((time(NULL) - last_run) < 900) {
699                 return;
700         }
701
702         /*
703          * This is a simple concurrency check to make sure only one rssclient run
704          * is done at a time.  We could do this with a mutex, but since we
705          * don't really require extremely fine granularity here, we'll do it
706          * with a static variable instead.
707          */
708         if (doing_rssclient) return;
709         doing_rssclient = 1;
710
711         syslog(LOG_DEBUG, "rssclient started\n");
712         CtdlForEachRoom(rssclient_scan_room, NULL);
713
714         pthread_mutex_lock(&RSSQueueMutex);
715
716         it = GetNewHashPos(RSSFetchUrls, 0);
717         while (!server_shutting_down &&
718                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
719                (vrptr != NULL)) {
720                 rptr = (rss_aggregator *)vrptr;
721                 if (rptr->RefCount == 0) 
722                         if (!rss_do_fetching(rptr))
723                                 UnlinkRSSAggregator(rptr);
724         }
725         DeleteHashPos(&it);
726         pthread_mutex_unlock(&RSSQueueMutex);
727
728         syslog(LOG_DEBUG, "rssclient ended\n");
729         doing_rssclient = 0;
730         return;
731 }
732
733 void rss_cleanup(void)
734 {
735         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
736         DeleteHash(&RSSFetchUrls);
737         DeleteHash(&RSSQueueRooms);
738 }
739
740
741 CTDL_MODULE_INIT(rssclient)
742 {
743         if (threading)
744         {
745                 CtdlFillSystemContext(&rss_CC, "rssclient");
746                 pthread_mutex_init(&RSSQueueMutex, NULL);
747                 RSSQueueRooms = NewHash(1, lFlathash);
748                 RSSFetchUrls = NewHash(1, NULL);
749                 syslog(LOG_INFO, "%s\n", curl_version());
750                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
751                 CtdlRegisterCleanupHook(rss_cleanup);
752         }
753         return "rssclient";
754 }