2 * Bring external RSS feeds into rooms.
4 * Copyright (c) 2007-2010 by the citadel.org team
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/types.h>
42 #include <curl/curl.h>
43 #include <libcitadel.h>
46 #include "citserver.h"
50 #include "ctdl_module.h"
52 #include "parsedate.h"
54 #include "citadel_dirs.h"
57 #include "event_client.h"
58 #include "rss_atom_parser.h"
61 #define TMP_MSGDATA 0xFF
62 #define TMP_SHORTER_URL_OFFSET 0xFE
63 #define TMP_SHORTER_URLS 0xFD
67 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
68 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
69 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
71 eNextState RSSAggregator_Terminate(AsyncIO *IO);
72 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
73 struct CitContext rss_CC;
75 struct rssnetcfg *rnclist = NULL;
78 void DeleteRoomReference(long QRnumber)
84 rss_room_counter *pRoomC;
86 At = GetNewHashPos(RSSQueueRooms, 0);
88 if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
90 GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
93 pRoomC = (rss_room_counter *) vData;
95 if (pRoomC->count == 0)
96 DeleteEntryFromHash(RSSQueueRooms, At);
102 void UnlinkRooms(rss_aggregator *Cfg)
104 DeleteRoomReference(Cfg->QRnumber);
105 if (Cfg->OtherQRnumbers != NULL)
112 At = GetNewHashPos(Cfg->OtherQRnumbers, 0);
113 while (! server_shutting_down &&
114 GetNextHashPos(Cfg->OtherQRnumbers,
120 long *lData = (long*) vData;
121 DeleteRoomReference(*lData);
128 void UnlinkRSSAggregator(rss_aggregator *Cfg)
134 At = GetNewHashPos(RSSFetchUrls, 0);
135 if (GetHashPosFromKey(RSSFetchUrls, SKEY(Cfg->Url), At))
137 DeleteEntryFromHash(RSSFetchUrls, At);
140 last_run = time(NULL);
144 void DeleteRssCfg(void *vptr)
146 rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
147 AsyncIO *IO = &RSSAggr->IO;
148 EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
150 FreeStrBuf(&RSSAggr->Url);
151 FreeStrBuf(&RSSAggr->rooms);
152 FreeStrBuf(&RSSAggr->CData);
153 FreeStrBuf(&RSSAggr->Key);
154 DeleteHash(&RSSAggr->OtherQRnumbers);
156 DeleteHashPos (&RSSAggr->Pos);
157 DeleteHash (&RSSAggr->Messages);
158 if (RSSAggr->recp.recp_room != NULL)
159 free(RSSAggr->recp.recp_room);
162 if (RSSAggr->Item != NULL)
164 flush_rss_item(RSSAggr->Item);
169 FreeAsyncIOContents(&RSSAggr->IO);
173 eNextState RSSAggregator_Terminate(AsyncIO *IO)
175 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
177 EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
180 UnlinkRSSAggregator(RSSAggr);
183 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
186 rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
188 pUrl = IO->ConnectMe->PlainUrl;
192 EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
195 UnlinkRSSAggregator(RSSAggr);
200 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
202 return eAbort; ///TODO
205 eNextState RSSSaveMessage(AsyncIO *IO)
209 rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
211 RSSAggr->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&RSSAggr->ThisMsg->Message);
213 CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
215 /* write the uidl to the use table so we don't store this item again */
216 cdb_store(CDB_USETABLE,
217 SKEY(RSSAggr->ThisMsg->MsgGUID),
218 &RSSAggr->ThisMsg->ut,
219 sizeof(struct UseTable) );
221 if (GetNextHashPos(RSSAggr->Messages,
224 (void**) &RSSAggr->ThisMsg))
225 return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
230 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
234 struct cdbdata *cdbut;
235 rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
237 /* Find out if we've already seen this item */
238 strcpy(Ctx->ThisMsg->ut.ut_msgid,
239 ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
240 Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
242 cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
245 /* Item has already been seen */
247 "%s has already been seen\n",
248 ChrPtr(Ctx->ThisMsg->MsgGUID));
251 /* rewrite the record anyway, to update the timestamp */
252 cdb_store(CDB_USETABLE,
253 SKEY(Ctx->ThisMsg->MsgGUID),
254 &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
256 if (GetNextHashPos(Ctx->Messages,
259 (void**) &Ctx->ThisMsg))
260 return NextDBOperation(
262 RSS_FetchNetworkUsetableEntry);
269 NextDBOperation(IO, RSSSaveMessage);
279 int rss_do_fetching(rss_aggregator *Cfg)
286 if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
289 ri = (rss_item*) malloc(sizeof(rss_item));
290 memset(ri, 0, sizeof(rss_item));
293 if (! InitcURLIOStruct(&Cfg->IO,
295 "Citadel RSS Client",
296 RSSAggregator_ParseReply,
297 RSSAggregator_Terminate,
298 RSSAggregator_ShutdownAbort))
300 syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
304 safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host,
306 sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host));
308 syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url));
309 ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80);
310 CurlPrepareURL(Cfg->IO.ConnectMe);
312 QueueCurlContext(&Cfg->IO);
317 * Scan a room's netconfig to determine whether it is requesting any RSS feeds
319 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
321 StrBuf *CfgData=NULL;
324 rss_room_counter *Count = NULL;
326 char filename[PATH_MAX];
329 rss_aggregator *RSSAggr = NULL;
330 rss_aggregator *use_this_RSSAggr = NULL;
332 const char *CfgPtr, *lPtr;
335 pthread_mutex_lock(&RSSQueueMutex);
336 if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
339 "rssclient: [%ld] %s already in progress.\n",
342 pthread_mutex_unlock(&RSSQueueMutex);
345 pthread_mutex_unlock(&RSSQueueMutex);
347 assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
349 if (server_shutting_down)
352 /* Only do net processing for rooms that have netconfigs */
353 fd = open(filename, 0);
356 "rssclient: %s no config.\n",
361 if (server_shutting_down)
364 if (fstat(fd, &statbuf) == -1) {
366 "ERROR: could not stat configfile '%s' - %s\n",
372 if (server_shutting_down)
375 CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
377 if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
379 FreeStrBuf(&CfgData);
380 syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s<br>\n",
381 filename, strerror(errno));
385 if (server_shutting_down)
389 CfgType = NewStrBuf();
390 Line = NewStrBufPlain(NULL, StrLength(CfgData));
394 Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
395 if (StrLength(Line) > 0)
398 StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
399 if (!strcasecmp("rssclient", ChrPtr(CfgType)))
403 Count = malloc(sizeof(rss_room_counter));
407 RSSAggr = (rss_aggregator *) malloc(sizeof(rss_aggregator));
408 memset (RSSAggr, 0, sizeof(rss_aggregator));
409 RSSAggr->roomlist_parts = 1;
410 RSSAggr->Url = NewStrBuf();
411 StrBufExtract_NextToken(RSSAggr->Url, Line, &lPtr, '|');
413 pthread_mutex_lock(&RSSQueueMutex);
414 GetHash(RSSFetchUrls, SKEY(RSSAggr->Url), &vptr);
415 use_this_RSSAggr = (rss_aggregator *)vptr;
416 if (use_this_RSSAggr != NULL)
419 StrBufAppendBufPlain(use_this_RSSAggr->rooms,
422 if (use_this_RSSAggr->roomlist_parts == 1)
424 use_this_RSSAggr->OtherQRnumbers =
425 NewHash(1, lFlathash);
427 QRnumber = (long*)malloc(sizeof(long));
428 *QRnumber = qrbuf->QRnumber;
429 Put(use_this_RSSAggr->OtherQRnumbers,
430 LKEY(qrbuf->QRnumber),
433 use_this_RSSAggr->roomlist_parts++;
435 pthread_mutex_unlock(&RSSQueueMutex);
437 FreeStrBuf(&RSSAggr->Url);
442 pthread_mutex_unlock(&RSSQueueMutex);
444 RSSAggr->ItemType = RSS_UNSET;
446 RSSAggr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
448 pthread_mutex_lock(&RSSQueueMutex);
449 Put(RSSFetchUrls, SKEY(RSSAggr->Url), RSSAggr, DeleteRssCfg);
450 pthread_mutex_unlock(&RSSQueueMutex);
456 Count->QRnumber = qrbuf->QRnumber;
457 pthread_mutex_lock(&RSSQueueMutex);
458 syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n",
459 qrbuf->QRnumber, qrbuf->QRname);
460 Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
461 pthread_mutex_unlock(&RSSQueueMutex);
463 FreeStrBuf(&CfgData);
464 FreeStrBuf(&CfgType);
469 * Scan for rooms that have RSS client requests configured
471 void rssclient_scan(void) {
472 static int doing_rssclient = 0;
473 rss_aggregator *rptr = NULL;
479 /* Run no more than once every 15 minutes. */
480 if ((time(NULL) - last_run) < 900) {
485 * This is a simple concurrency check to make sure only one rssclient
486 * run is done at a time. We could do this with a mutex, but since we
487 * don't really require extremely fine granularity here, we'll do it
488 * with a static variable instead.
490 if (doing_rssclient) return;
492 if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
495 become_session(&rss_CC);
496 syslog(LOG_DEBUG, "rssclient started\n");
497 CtdlForEachRoom(rssclient_scan_room, NULL);
499 pthread_mutex_lock(&RSSQueueMutex);
501 it = GetNewHashPos(RSSFetchUrls, 0);
502 while (!server_shutting_down &&
503 GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
505 rptr = (rss_aggregator *)vrptr;
506 if (!rss_do_fetching(rptr))
507 UnlinkRSSAggregator(rptr);
510 pthread_mutex_unlock(&RSSQueueMutex);
512 syslog(LOG_DEBUG, "rssclient ended\n");
517 void rss_cleanup(void)
519 /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
520 DeleteHash(&RSSFetchUrls);
521 DeleteHash(&RSSQueueRooms);
525 CTDL_MODULE_INIT(rssclient)
529 CtdlFillSystemContext(&rss_CC, "rssclient");
530 pthread_mutex_init(&RSSQueueMutex, NULL);
531 RSSQueueRooms = NewHash(1, lFlathash);
532 RSSFetchUrls = NewHash(1, NULL);
533 syslog(LOG_INFO, "%s\n", curl_version());
534 CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER);
535 CtdlRegisterCleanupHook(rss_cleanup);