X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=e2e9aea12ec9bf8db65213a8b787012234485dab;hb=192056a6112602350c1f8a73eae2e134a31c7ba2;hp=db13c62cc30bd23eb4d5863d5bbae4aa62c755db;hpb=32da30ae02d4173e3169d9aa688bc33af9f1f384;p=citadel.git diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index db13c62cc..e2e9aea12 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -62,12 +62,15 @@ #define TMP_SHORTER_URL_OFFSET 0xFE #define TMP_SHORTER_URLS 0xFD -citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ +time_t last_run = 0L; + +pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ eNextState RSSAggregatorTerminate(AsyncIO *IO); +struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title) @@ -137,10 +140,12 @@ void UnlinkRooms(rss_aggregator *Cfg) long *lData = (long*) vData; DeleteRoomReference(*lData); } +/* + if (server_shutting_down) + break; / * TODO */ DeleteHashPos(&At); } - } void UnlinkRSSAggregator(rss_aggregator *Cfg) @@ -155,13 +160,14 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) DeleteEntryFromHash(RSSFetchUrls, At); } DeleteHashPos(&At); + last_run = time(NULL); } eNextState FreeNetworkSaveMessage (AsyncIO *IO) { networker_save_message *Ctx = (networker_save_message *) IO->Data; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Ctx->Cfg->RefCount --; if (Ctx->Cfg->RefCount == 0) @@ -169,13 +175,16 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) UnlinkRSSAggregator(Ctx->Cfg); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); CtdlFreeMessage(Ctx->Msg); free_recipients(Ctx->recp); FreeStrBuf(&Ctx->Message); FreeStrBuf(&Ctx->MsgGUID); + ((struct CitContext*)IO->CitContext)->state = CON_IDLE; + ((struct CitContext*)IO->CitContext)->kill_me = 1; free(Ctx); + last_run = time(NULL); return eAbort; } @@ -198,8 +207,6 @@ eNextState RSSSaveMessage(AsyncIO *IO) return eTerminateConnection; } -// TODO: relink me: ExpandShortUrls(ri->description); - eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { struct cdbdata *cdbut; @@ -213,7 +220,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) #ifndef DEBUG_RSS if (cdbut != NULL) { /* Item has already been seen */ - CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); + syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ @@ -242,7 +249,7 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf Ctx->Cfg = Cfg; Ctx->recp = recp; Ctx->IO.Data = Ctx; - Ctx->IO.CitContext = CloneContext(CC); + Ctx->IO.CitContext = CloneContext(&rss_CC); Ctx->IO.Terminate = FreeNetworkSaveMessage; Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); @@ -297,7 +304,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) } /* translate Item into message. */ - CtdlLogPrintf(CTDL_DEBUG, "RSS: translating item...\n"); + syslog(LOG_DEBUG, "RSS: translating item...\n"); if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); StrBufSpaceToBlank(ri->description); msg = malloc(sizeof(struct CtdlMessage)); @@ -332,12 +339,19 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) else { if (FromAt) - msg->cm_fields['P'] = SmashStrBuf(&ri->author_or_creator); + { + msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator); + msg->cm_fields['P'] = strdup(msg->cm_fields['A']); + } else { StrBufRFC2047encode(&Encoded, ri->author_or_creator); msg->cm_fields['A'] = SmashStrBuf(&Encoded); msg->cm_fields['P'] = strdup("rss@localhost"); + + } + if (ri->pubdate <= 0) { + ri->pubdate = time(NULL); } } } @@ -426,11 +440,11 @@ int rss_do_fetching(rss_aggregator *Cfg) memset(ri, 0, sizeof(rss_item)); Cfg->Item = ri; IO = &Cfg->IO; - IO->CitContext = CloneContext(CC); + IO->CitContext = CloneContext(&rss_CC); IO->Data = Cfg; - CtdlLogPrintf(CTDL_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); + syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); ParseURL(&IO->ConnectMe, Cfg->Url, 80); CurlPrepareURL(IO->ConnectMe); @@ -441,16 +455,15 @@ int rss_do_fetching(rss_aggregator *Cfg) ParseRSSReply, RSSAggregatorTerminate)) { - CtdlLogPrintf(CTDL_ALERT, "Unable to initialize libcurl.\n"); + syslog(LOG_DEBUG, "Unable to initialize libcurl.\n"); return 0; } - evcurl_handle_start(IO); + QueueCurlContext(IO); return 1; } - void DeleteRssCfg(void *vptr) { rss_aggregator *rncptr = (rss_aggregator *)vptr; @@ -459,8 +472,9 @@ void DeleteRssCfg(void *vptr) FreeStrBuf(&rncptr->rooms); FreeStrBuf(&rncptr->CData); FreeStrBuf(&rncptr->Key); - + FreeStrBuf(&rncptr->IO.HttpReq.ReplyData); DeleteHash(&rncptr->OtherQRnumbers); + FreeURL(&rncptr->IO.ConnectMe); if (rncptr->Item != NULL) { @@ -484,30 +498,30 @@ void DeleteRssCfg(void *vptr) eNextState RSSAggregatorTerminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; + HashPos *At; long HKLen; const char *HK; void *vData; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); rncptr->RefCount --; if (rncptr->RefCount == 0) { UnlinkRSSAggregator(rncptr); } - citthread_mutex_unlock(&RSSQueueMutex); -/* + pthread_mutex_unlock(&RSSQueueMutex); + At = GetNewHashPos(RSSFetchUrls, 0); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At); GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); DeleteEntryFromHash(RSSFetchUrls, At); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); DeleteHashPos(&At); -*/ return eAbort; } @@ -516,7 +530,7 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) */ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) { - StrBuf *CfgData; + StrBuf *CfgData=NULL; StrBuf *CfgType; StrBuf *Line; rss_room_counter *Count = NULL; @@ -530,48 +544,53 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) const char *CfgPtr, *lPtr; const char *Err; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { - CtdlLogPrintf(CTDL_DEBUG, + syslog(LOG_DEBUG, "rssclient: [%ld] %s already in progress.\n", qrbuf->QRnumber, qrbuf->QRname); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); return; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; /* Only do net processing for rooms that have netconfigs */ fd = open(filename, 0); if (fd <= 0) { - //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); + //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + if (fstat(fd, &statbuf) == -1) { - CtdlLogPrintf(CTDL_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", - filename, strerror(errno)); + syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", + filename, strerror(errno)); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { close(fd); FreeStrBuf(&CfgData); - CtdlLogPrintf(CTDL_DEBUG, "ERROR: reading config '%s' - %s
\n", + syslog(LOG_DEBUG, "ERROR: reading config '%s' - %s
\n", filename, strerror(errno)); return; } close(fd); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; CfgPtr = NULL; @@ -599,7 +618,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) rncptr->Url = NewStrBuf(); StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); use_this_rncptr = (rss_aggregator *)vptr; if (use_this_rncptr != NULL) @@ -625,29 +644,34 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); use_this_rncptr->roomlist_parts++; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); + + + FreeStrBuf(&rncptr->Url); + free(rncptr); + rncptr = NULL; continue; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } } } if (Count != NULL) { Count->QRnumber = qrbuf->QRnumber; - citthread_mutex_lock(&RSSQueueMutex); - CtdlLogPrintf(CTDL_DEBUG, "rssclient: [%ld] %s now starting.\n", + pthread_mutex_lock(&RSSQueueMutex); + syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", qrbuf->QRnumber, qrbuf->QRname); Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } FreeStrBuf(&CfgData); FreeStrBuf(&CfgType); @@ -665,6 +689,11 @@ void rssclient_scan(void) { long len; const char *Key; + /* Run no more than once every 15 minutes. */ + if ((time(NULL) - last_run) < 900) { + return; + } + /* * This is a simple concurrency check to make sure only one rssclient run * is done at a time. We could do this with a mutex, but since we @@ -674,13 +703,14 @@ void rssclient_scan(void) { if (doing_rssclient) return; doing_rssclient = 1; - CtdlLogPrintf(CTDL_DEBUG, "rssclient started\n"); + syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); it = GetNewHashPos(RSSFetchUrls, 0); - while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; if (rptr->RefCount == 0) @@ -688,16 +718,16 @@ void rssclient_scan(void) { UnlinkRSSAggregator(rptr); } DeleteHashPos(&it); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); - CtdlLogPrintf(CTDL_DEBUG, "rssclientscheduler ended\n"); + syslog(LOG_DEBUG, "rssclient ended\n"); doing_rssclient = 0; return; } void rss_cleanup(void) { - citthread_mutex_destroy(&RSSQueueMutex); + /* citthread_mutex_destroy(&RSSQueueMutex); TODO */ DeleteHash(&RSSFetchUrls); DeleteHash(&RSSQueueRooms); } @@ -707,10 +737,11 @@ CTDL_MODULE_INIT(rssclient) { if (threading) { - citthread_mutex_init(&RSSQueueMutex, NULL); + CtdlFillSystemContext(&rss_CC, "rssclient"); + pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); - CtdlLogPrintf(CTDL_INFO, "%s\n", curl_version()); + syslog(LOG_INFO, "%s\n", curl_version()); CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER); CtdlRegisterCleanupHook(rss_cleanup); }