fix some more memleaks.
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
1 /*
2  * Bring external RSS feeds into rooms.
3  *
4  * Copyright (c) 2007-2010 by the citadel.org team
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35
36 #include <ctype.h>
37 #include <string.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <expat.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
44 #include "citadel.h"
45 #include "server.h"
46 #include "citserver.h"
47 #include "support.h"
48 #include "config.h"
49 #include "threads.h"
50 #include "ctdl_module.h"
51 #include "msgbase.h"
52 #include "parsedate.h"
53 #include "database.h"
54 #include "citadel_dirs.h"
55 #include "md5.h"
56 #include "context.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
59
60
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
64
65 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
66 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
67 HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
68
69 eNextState RSSAggregatorTerminate(AsyncIO *IO);
70
71 struct CitContext rss_CC;
72
73 struct rssnetcfg *rnclist = NULL;
74 void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title)
75 {
76         if (StrLength(link) > 0)
77         {
78                 StrBufAppendBufPlain(Message, HKEY("<a href=\""), 0);
79                 StrBufAppendBuf(Message, link, 0);
80                 StrBufAppendBufPlain(Message, HKEY("\">"), 0);
81                 if (StrLength(LinkTitle) > 0)
82                         StrBufAppendBuf(Message, LinkTitle, 0);
83                 else if ((Title != NULL) && !IsEmptyStr(Title))
84                         StrBufAppendBufPlain(Message, Title, -1, 0);
85                 else
86                         StrBufAppendBuf(Message, link, 0);
87                 StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
88         }
89 }
90 typedef struct __networker_save_message {
91         AsyncIO IO;
92         struct CtdlMessage *Msg;
93         struct recptypes *recp;
94         rss_aggregator *Cfg;
95         StrBuf *MsgGUID;
96         StrBuf *Message;
97         struct UseTable ut;
98 } networker_save_message;
99
100
101 void DeleteRoomReference(long QRnumber)
102 {
103         HashPos *At;
104         long HKLen;
105         const char *HK;
106         void *vData = NULL;
107         rss_room_counter *pRoomC;
108
109         At = GetNewHashPos(RSSQueueRooms, 0);
110
111         GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At);
112         GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
113         if (vData != NULL)
114         {
115                 pRoomC = (rss_room_counter *) vData;
116                 pRoomC->count --;
117                 if (pRoomC->count == 0)
118                         DeleteEntryFromHash(RSSQueueRooms, At);
119         }
120         DeleteHashPos(&At);
121 }
122
123 void UnlinkRooms(rss_aggregator *Cfg)
124 {
125         
126         DeleteRoomReference(Cfg->QRnumber);
127         if (Cfg->OtherQRnumbers != NULL)
128         {
129                 long HKLen;
130                 const char *HK;
131                 HashPos *At;
132                 void *vData;
133
134                 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
135                 while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && 
136                        (vData != NULL))
137                 {
138                         long *lData = (long*) vData;
139                         DeleteRoomReference(*lData);
140                 }
141 /*
142                 if (server_shutting_down)
143                         break; /* TODO */
144
145                 DeleteHashPos(&At);
146         }
147 }
148
149 void UnlinkRSSAggregator(rss_aggregator *Cfg)
150 {
151         HashPos *At;
152
153         UnlinkRooms(Cfg);
154
155         At = GetNewHashPos(RSSFetchUrls, 0);
156         if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At) == 0)
157         {
158                 DeleteEntryFromHash(RSSFetchUrls, At);
159         }
160         DeleteHashPos(&At);
161 }
162
163 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
164 {
165         networker_save_message *Ctx = (networker_save_message *) IO->Data;
166
167         pthread_mutex_lock(&RSSQueueMutex);
168         Ctx->Cfg->RefCount --;
169
170         if (Ctx->Cfg->RefCount == 0)
171         {
172                 UnlinkRSSAggregator(Ctx->Cfg);
173
174         }
175         pthread_mutex_unlock(&RSSQueueMutex);
176
177         CtdlFreeMessage(Ctx->Msg);
178         free_recipients(Ctx->recp);
179         FreeStrBuf(&Ctx->Message);
180         FreeStrBuf(&Ctx->MsgGUID);
181         free(Ctx);
182         return eAbort;
183 }
184
185 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
186 {
187         return eAbort; ///TODO
188 }
189
190 eNextState RSSSaveMessage(AsyncIO *IO)
191 {
192         networker_save_message *Ctx = (networker_save_message *) IO->Data;
193
194         Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
195
196         CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
197
198         /* write the uidl to the use table so we don't store this item again */
199         cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
200
201         return eTerminateConnection;
202 }
203
204 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
205 {
206         struct cdbdata *cdbut;
207         networker_save_message *Ctx = (networker_save_message *) IO->Data;
208
209         /* Find out if we've already seen this item */
210         strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
211         Ctx->ut.ut_timestamp = time(NULL);
212
213         cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
214 #ifndef DEBUG_RSS
215         if (cdbut != NULL) {
216                 /* Item has already been seen */
217                 syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
218                 cdb_free(cdbut);
219
220                 /* rewrite the record anyway, to update the timestamp */
221                 cdb_store(CDB_USETABLE, 
222                           SKEY(Ctx->MsgGUID), 
223                           &Ctx->ut, sizeof(struct UseTable) );
224                 return eAbort;
225         }
226         else
227 #endif
228         {
229                 NextDBOperation(IO, RSSSaveMessage);
230                 return eSendMore;
231         }
232 }
233 void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg)
234 {
235         networker_save_message *Ctx;
236
237         Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
238         memset(Ctx, 0, sizeof(networker_save_message));
239         
240         Ctx->MsgGUID = MsgGUID;
241         Ctx->Message = MessageBody;
242         Ctx->Msg = Msg;
243         Ctx->Cfg = Cfg;
244         Ctx->recp = recp;
245         Ctx->IO.Data = Ctx;
246         Ctx->IO.CitContext = CloneContext(&rss_CC);
247         Ctx->IO.Terminate = FreeNetworkSaveMessage;
248         Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
249         QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
250 }
251
252
253 /*
254  * Commit a fetched and parsed RSS item to disk
255  */
256 void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
257 {
258
259         struct MD5Context md5context;
260         u_char rawdigest[MD5_DIGEST_LEN];
261         struct CtdlMessage *msg;
262         struct recptypes *recp = NULL;
263         int msglen = 0;
264         StrBuf *Message;
265         StrBuf *guid;
266         StrBuf *Buf;
267
268         recp = (struct recptypes *) malloc(sizeof(struct recptypes));
269         if (recp == NULL) return;
270         memset(recp, 0, sizeof(struct recptypes));
271         Buf = NewStrBufDup(Cfg->rooms);
272         recp->recp_room = SmashStrBuf(&Buf);
273         recp->num_room = Cfg->roomlist_parts;
274         recp->recptypes_magic = RECPTYPES_MAGIC;
275    
276         Cfg->RefCount ++;
277         /* Construct a GUID to use in the S_USETABLE table.
278          * If one is not present in the item itself, make one up.
279          */
280         if (ri->guid != NULL) {
281                 StrBufSpaceToBlank(ri->guid);
282                 StrBufTrim(ri->guid);
283                 guid = NewStrBufPlain(HKEY("rss/"));
284                 StrBufAppendBuf(guid, ri->guid, 0);
285         }
286         else {
287                 MD5Init(&md5context);
288                 if (ri->title != NULL) {
289                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title));
290                 }
291                 if (ri->link != NULL) {
292                         MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link));
293                 }
294                 MD5Final(rawdigest, &md5context);
295                 guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/);
296                 StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN);
297                 StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0);
298         }
299
300         /* translate Item into message. */
301         syslog(LOG_DEBUG, "RSS: translating item...\n");
302         if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
303         StrBufSpaceToBlank(ri->description);
304         msg = malloc(sizeof(struct CtdlMessage));
305         memset(msg, 0, sizeof(struct CtdlMessage));
306         msg->cm_magic = CTDLMESSAGE_MAGIC;
307         msg->cm_anon_type = MES_NORMAL;
308         msg->cm_format_type = FMT_RFC822;
309
310         if (ri->guid != NULL) {
311                 msg->cm_fields['E'] = strdup(ChrPtr(ri->guid));
312         }
313
314         if (ri->author_or_creator != NULL) {
315                 char *From;
316                 StrBuf *Encoded = NULL;
317                 int FromAt;
318                         
319                 From = html_to_ascii(ChrPtr(ri->author_or_creator),
320                                      StrLength(ri->author_or_creator), 
321                                      512, 0);
322                 StrBufPlain(ri->author_or_creator, From, -1);
323                 StrBufTrim(ri->author_or_creator);
324                 free(From);
325
326                 FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL;
327                 if (!FromAt && StrLength (ri->author_email) > 0)
328                 {
329                         StrBufRFC2047encode(&Encoded, ri->author_or_creator);
330                         msg->cm_fields['A'] = SmashStrBuf(&Encoded);
331                         msg->cm_fields['P'] = SmashStrBuf(&ri->author_email);
332                 }
333                 else
334                 {
335                         if (FromAt)
336                         {
337                                 msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator);
338                                 msg->cm_fields['P'] = strdup(msg->cm_fields['A']);
339                         }
340                         else 
341                         {
342                                 StrBufRFC2047encode(&Encoded, ri->author_or_creator);
343                                 msg->cm_fields['A'] = SmashStrBuf(&Encoded);
344                                 msg->cm_fields['P'] = strdup("rss@localhost");
345
346                         }
347                         if (ri->pubdate <= 0) {
348                                 ri->pubdate = time(NULL);
349                         }
350                 }
351         }
352         else {
353                 msg->cm_fields['A'] = strdup("rss");
354         }
355
356         msg->cm_fields['N'] = strdup(NODENAME);
357         if (ri->title != NULL) {
358                 long len;
359                 char *Sbj;
360                 StrBuf *Encoded, *QPEncoded;
361
362                 QPEncoded = NULL;
363                 StrBufSpaceToBlank(ri->title);
364                 len = StrLength(ri->title);
365                 Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0);
366                 len = strlen(Sbj);
367                 if (Sbj[len - 1] == '\n')
368                 {
369                         len --;
370                         Sbj[len] = '\0';
371                 }
372                 Encoded = NewStrBufPlain(Sbj, len);
373                 free(Sbj);
374
375                 StrBufTrim(Encoded);
376                 StrBufRFC2047encode(&QPEncoded, Encoded);
377
378                 msg->cm_fields['U'] = SmashStrBuf(&QPEncoded);
379                 FreeStrBuf(&Encoded);
380         }
381         msg->cm_fields['T'] = malloc(64);
382         snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate);
383         if (ri->channel_title != NULL) {
384                 if (StrLength(ri->channel_title) > 0) {
385                         msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title));
386                 }
387         }
388         if (ri->link == NULL) 
389                 ri->link = NewStrBufPlain(HKEY(""));
390
391 #if 0 /* temporarily disable shorter urls. */
392         msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description);
393 #endif
394
395         msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ;
396
397         Message = NewStrBufPlain(NULL, StrLength(ri->description));
398
399         StrBufPlain(Message, HKEY(
400                             "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n"
401                             "<html><body>\n"));
402 #if 0 /* disable shorter url for now. */
403         msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message);
404 #endif
405         StrBufAppendBuf(Message, ri->description, 0);
406         StrBufAppendBufPlain(Message, HKEY("<br><br>\n"), 0);
407
408         AppendLink(Message, ri->link, ri->linkTitle, NULL);
409         AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
410         StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
411
412         RSSQueueSaveMessage(msg, recp, guid, Message, Cfg);
413 }
414
415
416
417 /*
418  * Begin a feed parse
419  */
420 int rss_do_fetching(rss_aggregator *Cfg)
421 {
422         rss_item *ri;
423                 
424         time_t now;
425         AsyncIO *IO;
426
427         now = time(NULL);
428
429         if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
430                 return 0;
431         Cfg->RefCount = 1;
432
433         ri = (rss_item*) malloc(sizeof(rss_item));
434         memset(ri, 0, sizeof(rss_item));
435         Cfg->Item = ri;
436         IO = &Cfg->IO;
437         IO->CitContext = CloneContext(&rss_CC);
438         IO->Data = Cfg;
439
440
441         syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
442         ParseURL(&IO->ConnectMe, Cfg->Url, 80);
443         CurlPrepareURL(IO->ConnectMe);
444
445         if (! evcurl_init(IO, 
446 //                        Ctx, 
447                           NULL,
448                           "Citadel RSS Client",
449                           ParseRSSReply, 
450                           RSSAggregatorTerminate))
451         {
452                 syslog(LOG_DEBUG, "Unable to initialize libcurl.\n");
453                 return 0;
454         }
455
456         evcurl_handle_start(IO);
457         return 1;
458 }
459
460
461 void DeleteRssCfg(void *vptr)
462 {
463         rss_aggregator *rncptr = (rss_aggregator *)vptr;
464
465         FreeStrBuf(&rncptr->Url);
466         FreeStrBuf(&rncptr->rooms);
467         FreeStrBuf(&rncptr->CData);
468         FreeStrBuf(&rncptr->Key);
469         FreeStrBuf(&rncptr->IO.HttpReq.ReplyData);
470         DeleteHash(&rncptr->OtherQRnumbers);
471         FreeURL(&rncptr->IO.ConnectMe);
472
473         if (rncptr->Item != NULL)
474         {
475                 FreeStrBuf(&rncptr->Item->guid);
476                 FreeStrBuf(&rncptr->Item->title);
477                 FreeStrBuf(&rncptr->Item->link);
478                 FreeStrBuf(&rncptr->Item->linkTitle);
479                 FreeStrBuf(&rncptr->Item->reLink);
480                 FreeStrBuf(&rncptr->Item->reLinkTitle);
481                 FreeStrBuf(&rncptr->Item->description);
482                 FreeStrBuf(&rncptr->Item->channel_title);
483                 FreeStrBuf(&rncptr->Item->author_or_creator);
484                 FreeStrBuf(&rncptr->Item->author_url);
485                 FreeStrBuf(&rncptr->Item->author_email);
486
487                 free(rncptr->Item);
488         }
489         free(rncptr);
490 }
491
492 eNextState RSSAggregatorTerminate(AsyncIO *IO)
493 {
494         rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
495         /*
496           HashPos *At;
497           long HKLen;
498           const char *HK;
499           void *vData;
500         */
501         pthread_mutex_lock(&RSSQueueMutex);
502         rncptr->RefCount --;
503         if (rncptr->RefCount == 0)
504         {
505                 UnlinkRSSAggregator(rncptr);
506
507         }
508         pthread_mutex_unlock(&RSSQueueMutex);
509 /*
510         At = GetNewHashPos(RSSFetchUrls, 0);
511
512         pthread_mutex_lock(&RSSQueueMutex);
513         GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
514         GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
515         DeleteEntryFromHash(RSSFetchUrls, At);
516         pthread_mutex_unlock(&RSSQueueMutex);
517
518         DeleteHashPos(&At);
519 */
520         return eAbort;
521 }
522
523 /*
524  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
525  */
526 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
527 {
528         StrBuf *CfgData=NULL;
529         StrBuf *CfgType;
530         StrBuf *Line;
531         rss_room_counter *Count = NULL;
532         struct stat statbuf;
533         char filename[PATH_MAX];
534         int  fd;
535         int Done;
536         rss_aggregator *rncptr = NULL;
537         rss_aggregator *use_this_rncptr = NULL;
538         void *vptr;
539         const char *CfgPtr, *lPtr;
540         const char *Err;
541
542         pthread_mutex_lock(&RSSQueueMutex);
543         if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
544         {
545                 syslog(LOG_DEBUG, 
546                               "rssclient: [%ld] %s already in progress.\n", 
547                               qrbuf->QRnumber, 
548                               qrbuf->QRname);
549                 pthread_mutex_unlock(&RSSQueueMutex);
550                 return;
551         }
552         pthread_mutex_unlock(&RSSQueueMutex);
553
554         assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
555
556         if (server_shutting_down)
557                 return;
558                 
559         /* Only do net processing for rooms that have netconfigs */
560         fd = open(filename, 0);
561         if (fd <= 0) {
562                 //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
563                 return;
564         }
565
566         if (server_shutting_down)
567                 return;
568
569         if (fstat(fd, &statbuf) == -1) {
570                 syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n",
571                        filename, strerror(errno));
572                 return;
573         }
574
575         if (server_shutting_down)
576                 return;
577
578         CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
579
580         if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
581                 close(fd);
582                 FreeStrBuf(&CfgData);
583                 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
584                         filename, strerror(errno));
585                 return;
586         }
587         close(fd);
588         if (server_shutting_down)
589                 return;
590         
591         CfgPtr = NULL;
592         CfgType = NewStrBuf();
593         Line = NewStrBufPlain(NULL, StrLength(CfgData));
594         Done = 0;
595         while (!Done)
596         {
597             Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
598             if (StrLength(Line) > 0)
599             {
600                 lPtr = NULL;
601                 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
602                 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
603                 {
604                     if (Count == NULL)
605                     {
606                         Count = malloc(sizeof(rss_room_counter));
607                         Count->count = 0;
608                     }
609                     Count->count ++;
610                     rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
611                     memset (rncptr, 0, sizeof(rss_aggregator));
612                     rncptr->roomlist_parts = 1;
613                     rncptr->Url = NewStrBuf();
614                     StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|');
615
616                     pthread_mutex_lock(&RSSQueueMutex);
617                     GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr);
618                     use_this_rncptr = (rss_aggregator *)vptr;
619                     if (use_this_rncptr != NULL)
620                     {
621                             /* mustn't attach to an active session */
622                             if (use_this_rncptr->RefCount > 0)
623                             {
624                                     DeleteRssCfg(rncptr);
625                                     Count->count--;
626                             }
627                             else 
628                             {
629                                     long *QRnumber;
630                                     StrBufAppendBufPlain(use_this_rncptr->rooms, 
631                                                          qrbuf->QRname, 
632                                                          -1, 0);
633                                     if (use_this_rncptr->roomlist_parts == 1)
634                                     {
635                                             use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
636                                     }
637                                     QRnumber = (long*)malloc(sizeof(long));
638                                     *QRnumber = qrbuf->QRnumber;
639                                     Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
640                                     use_this_rncptr->roomlist_parts++;
641                             }
642                             pthread_mutex_unlock(&RSSQueueMutex);
643
644
645                             FreeStrBuf(&rncptr->Url);       
646                             free(rncptr);
647                             rncptr = NULL;
648                             continue;
649                     }
650                     pthread_mutex_unlock(&RSSQueueMutex);
651
652                     rncptr->ItemType = RSS_UNSET;
653                                 
654                     rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
655
656                     pthread_mutex_lock(&RSSQueueMutex);
657                     Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg);
658                     pthread_mutex_unlock(&RSSQueueMutex);
659                 }
660             }
661         }
662         if (Count != NULL)
663         {
664                 Count->QRnumber = qrbuf->QRnumber;
665                 pthread_mutex_lock(&RSSQueueMutex);
666                 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", 
667                               qrbuf->QRnumber, qrbuf->QRname);
668                 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
669                 pthread_mutex_unlock(&RSSQueueMutex);
670         }
671         FreeStrBuf(&CfgData);
672         FreeStrBuf(&CfgType);
673         FreeStrBuf(&Line);
674 }
675
676 /*
677  * Scan for rooms that have RSS client requests configured
678  */
679 void rssclient_scan(void) {
680         static int doing_rssclient = 0;
681         rss_aggregator *rptr = NULL;
682         void *vrptr = NULL;
683         HashPos  *it;
684         long len;
685         const char *Key;
686
687         /* Run no more than once every 15 minutes. * /
688         if ((time(NULL) - last_run) < 900) {
689                 return;
690         }
691 */
692         /*
693          * This is a simple concurrency check to make sure only one rssclient run
694          * is done at a time.  We could do this with a mutex, but since we
695          * don't really require extremely fine granularity here, we'll do it
696          * with a static variable instead.
697          */
698         if (doing_rssclient) return;
699         doing_rssclient = 1;
700
701         syslog(LOG_DEBUG, "rssclient started\n");
702         CtdlForEachRoom(rssclient_scan_room, NULL);
703
704         pthread_mutex_lock(&RSSQueueMutex);
705
706         it = GetNewHashPos(RSSFetchUrls, 0);
707         while (!server_shutting_down &&
708                GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
709                (vrptr != NULL)) {
710                 rptr = (rss_aggregator *)vrptr;
711                 if (rptr->RefCount == 0) 
712                         if (!rss_do_fetching(rptr))
713                                 UnlinkRSSAggregator(rptr);
714         }
715         DeleteHashPos(&it);
716         pthread_mutex_unlock(&RSSQueueMutex);
717
718         syslog(LOG_DEBUG, "rssclient ended\n");
719         doing_rssclient = 0;
720         return;
721 }
722
723 void rss_cleanup(void)
724 {
725         /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
726         DeleteHash(&RSSFetchUrls);
727         DeleteHash(&RSSQueueRooms);
728 }
729
730
731 CTDL_MODULE_INIT(rssclient)
732 {
733         if (threading)
734         {
735                 CtdlFillSystemContext(&rss_CC, "rssclient");
736                 pthread_mutex_init(&RSSQueueMutex, NULL);
737                 RSSQueueRooms = NewHash(1, lFlathash);
738                 RSSFetchUrls = NewHash(1, NULL);
739                 syslog(LOG_INFO, "%s\n", curl_version());
740                 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
741                 CtdlRegisterCleanupHook(rss_cleanup);
742         }
743         return "rssclient";
744 }