X-Git-Url: https://code.citadel.org/?p=citadel.git;a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=b8d1b4495ec9cdd260d13ff61b0caf50094f09e6;hp=25544abce3df924b30f9cbde266837fc2bafabd6;hb=91acc8a0f1f9c4638613c6d2c177e28db1027366;hpb=958815c1a75e837a0b8fd9d4a8db9f016275a5e2 diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 25544abce..b8d1b4495 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -62,12 +62,15 @@ #define TMP_SHORTER_URL_OFFSET 0xFE #define TMP_SHORTER_URLS 0xFD -citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ +time_t last_run = 0L; + +pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ eNextState RSSAggregatorTerminate(AsyncIO *IO); +struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title) @@ -137,10 +140,12 @@ void UnlinkRooms(rss_aggregator *Cfg) long *lData = (long*) vData; DeleteRoomReference(*lData); } +/* + if (server_shutting_down) + break; / * TODO */ DeleteHashPos(&At); } - } void UnlinkRSSAggregator(rss_aggregator *Cfg) @@ -155,13 +160,14 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) DeleteEntryFromHash(RSSFetchUrls, At); } DeleteHashPos(&At); + last_run = time(NULL); } eNextState FreeNetworkSaveMessage (AsyncIO *IO) { networker_save_message *Ctx = (networker_save_message *) IO->Data; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Ctx->Cfg->RefCount --; if (Ctx->Cfg->RefCount == 0) @@ -169,13 +175,14 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) UnlinkRSSAggregator(Ctx->Cfg); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); CtdlFreeMessage(Ctx->Msg); free_recipients(Ctx->recp); FreeStrBuf(&Ctx->Message); FreeStrBuf(&Ctx->MsgGUID); free(Ctx); + last_run = time(NULL); return eAbort; } @@ -198,8 +205,6 @@ eNextState RSSSaveMessage(AsyncIO *IO) return eTerminateConnection; } -// TODO: relink me: ExpandShortUrls(ri->description); - eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { struct cdbdata *cdbut; @@ -242,7 +247,7 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf Ctx->Cfg = Cfg; Ctx->recp = recp; Ctx->IO.Data = Ctx; - Ctx->IO.CitContext = CloneContext(CC); + Ctx->IO.CitContext = CloneContext(&rss_CC); Ctx->IO.Terminate = FreeNetworkSaveMessage; Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); @@ -341,6 +346,10 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) StrBufRFC2047encode(&Encoded, ri->author_or_creator); msg->cm_fields['A'] = SmashStrBuf(&Encoded); msg->cm_fields['P'] = strdup("rss@localhost"); + + } + if (ri->pubdate <= 0) { + ri->pubdate = time(NULL); } } } @@ -429,7 +438,7 @@ int rss_do_fetching(rss_aggregator *Cfg) memset(ri, 0, sizeof(rss_item)); Cfg->Item = ri; IO = &Cfg->IO; - IO->CitContext = CloneContext(CC); + IO->CitContext = CloneContext(&rss_CC); IO->Data = Cfg; @@ -453,7 +462,6 @@ int rss_do_fetching(rss_aggregator *Cfg) } - void DeleteRssCfg(void *vptr) { rss_aggregator *rncptr = (rss_aggregator *)vptr; @@ -462,8 +470,9 @@ void DeleteRssCfg(void *vptr) FreeStrBuf(&rncptr->rooms); FreeStrBuf(&rncptr->CData); FreeStrBuf(&rncptr->Key); - + FreeStrBuf(&rncptr->IO.HttpReq.ReplyData); DeleteHash(&rncptr->OtherQRnumbers); + FreeURL(&rncptr->IO.ConnectMe); if (rncptr->Item != NULL) { @@ -487,27 +496,28 @@ void DeleteRssCfg(void *vptr) eNextState RSSAggregatorTerminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; - HashPos *At; - long HKLen; - const char *HK; - void *vData; - - citthread_mutex_lock(&RSSQueueMutex); + /* + HashPos *At; + long HKLen; + const char *HK; + void *vData; + */ + pthread_mutex_lock(&RSSQueueMutex); rncptr->RefCount --; if (rncptr->RefCount == 0) { UnlinkRSSAggregator(rncptr); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); /* At = GetNewHashPos(RSSFetchUrls, 0); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At); GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); DeleteEntryFromHash(RSSFetchUrls, At); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); DeleteHashPos(&At); */ @@ -519,7 +529,7 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) */ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) { - StrBuf *CfgData; + StrBuf *CfgData=NULL; StrBuf *CfgType; StrBuf *Line; rss_room_counter *Count = NULL; @@ -533,21 +543,21 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) const char *CfgPtr, *lPtr; const char *Err; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { syslog(LOG_DEBUG, "rssclient: [%ld] %s already in progress.\n", qrbuf->QRnumber, qrbuf->QRname); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); return; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; /* Only do net processing for rooms that have netconfigs */ @@ -556,16 +566,21 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + if (fstat(fd, &statbuf) == -1) { syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", - filename, strerror(errno)); + filename, strerror(errno)); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { close(fd); FreeStrBuf(&CfgData); @@ -574,7 +589,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) return; } close(fd); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; CfgPtr = NULL; @@ -602,7 +617,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) rncptr->Url = NewStrBuf(); StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); use_this_rncptr = (rss_aggregator *)vptr; if (use_this_rncptr != NULL) @@ -628,29 +643,34 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); use_this_rncptr->roomlist_parts++; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); + + + FreeStrBuf(&rncptr->Url); + free(rncptr); + rncptr = NULL; continue; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } } } if (Count != NULL) { Count->QRnumber = qrbuf->QRnumber; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", qrbuf->QRnumber, qrbuf->QRname); Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } FreeStrBuf(&CfgData); FreeStrBuf(&CfgType); @@ -668,6 +688,11 @@ void rssclient_scan(void) { long len; const char *Key; + /* Run no more than once every 15 minutes. */ + if ((time(NULL) - last_run) < 900) { + return; + } + /* * This is a simple concurrency check to make sure only one rssclient run * is done at a time. We could do this with a mutex, but since we @@ -680,10 +705,11 @@ void rssclient_scan(void) { syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); it = GetNewHashPos(RSSFetchUrls, 0); - while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; if (rptr->RefCount == 0) @@ -691,7 +717,7 @@ void rssclient_scan(void) { UnlinkRSSAggregator(rptr); } DeleteHashPos(&it); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient ended\n"); doing_rssclient = 0; @@ -700,7 +726,7 @@ void rssclient_scan(void) { void rss_cleanup(void) { - citthread_mutex_destroy(&RSSQueueMutex); + /* citthread_mutex_destroy(&RSSQueueMutex); TODO */ DeleteHash(&RSSFetchUrls); DeleteHash(&RSSQueueRooms); } @@ -710,7 +736,8 @@ CTDL_MODULE_INIT(rssclient) { if (threading) { - citthread_mutex_init(&RSSQueueMutex, NULL); + CtdlFillSystemContext(&rss_CC, "rssclient"); + pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version());