X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=e5060ec457a69d5095d1a6696a1a2ff756865977;hb=aab1bab5b2fb69f588df91896e4eb65bd5844bfd;hp=b3559f473c2524cb1ce7d230dd14dcd762471d85;hpb=3a3c5f14f7bad6637d48f64c559476904a7f5e9d;p=citadel.git diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index b3559f473..e5060ec45 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -62,12 +62,13 @@ #define TMP_SHORTER_URL_OFFSET 0xFE #define TMP_SHORTER_URLS 0xFD -citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ eNextState RSSAggregatorTerminate(AsyncIO *IO); +struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title) @@ -137,10 +138,12 @@ void UnlinkRooms(rss_aggregator *Cfg) long *lData = (long*) vData; DeleteRoomReference(*lData); } +/* + if (server_shutting_down) + break; /* TODO */ DeleteHashPos(&At); } - } void UnlinkRSSAggregator(rss_aggregator *Cfg) @@ -161,7 +164,7 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) { networker_save_message *Ctx = (networker_save_message *) IO->Data; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Ctx->Cfg->RefCount --; if (Ctx->Cfg->RefCount == 0) @@ -169,7 +172,7 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) UnlinkRSSAggregator(Ctx->Cfg); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); CtdlFreeMessage(Ctx->Msg); free_recipients(Ctx->recp); @@ -240,7 +243,7 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf Ctx->Cfg = Cfg; Ctx->recp = recp; Ctx->IO.Data = Ctx; - Ctx->IO.CitContext = CloneContext(CC); + Ctx->IO.CitContext = CloneContext(&rss_CC); Ctx->IO.Terminate = FreeNetworkSaveMessage; Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); @@ -339,6 +342,10 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) StrBufRFC2047encode(&Encoded, ri->author_or_creator); msg->cm_fields['A'] = SmashStrBuf(&Encoded); msg->cm_fields['P'] = strdup("rss@localhost"); + + } + if (ri->pubdate <= 0) { + ri->pubdate = time(NULL); } } } @@ -427,7 +434,7 @@ int rss_do_fetching(rss_aggregator *Cfg) memset(ri, 0, sizeof(rss_item)); Cfg->Item = ri; IO = &Cfg->IO; - IO->CitContext = CloneContext(CC); + IO->CitContext = CloneContext(&rss_CC); IO->Data = Cfg; @@ -451,7 +458,6 @@ int rss_do_fetching(rss_aggregator *Cfg) } - void DeleteRssCfg(void *vptr) { rss_aggregator *rncptr = (rss_aggregator *)vptr; @@ -460,8 +466,9 @@ void DeleteRssCfg(void *vptr) FreeStrBuf(&rncptr->rooms); FreeStrBuf(&rncptr->CData); FreeStrBuf(&rncptr->Key); - + FreeStrBuf(&rncptr->IO.HttpReq.ReplyData); DeleteHash(&rncptr->OtherQRnumbers); + FreeURL(&rncptr->IO.ConnectMe); if (rncptr->Item != NULL) { @@ -485,27 +492,28 @@ void DeleteRssCfg(void *vptr) eNextState RSSAggregatorTerminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; - HashPos *At; - long HKLen; - const char *HK; - void *vData; - - citthread_mutex_lock(&RSSQueueMutex); + /* + HashPos *At; + long HKLen; + const char *HK; + void *vData; + */ + pthread_mutex_lock(&RSSQueueMutex); rncptr->RefCount --; if (rncptr->RefCount == 0) { UnlinkRSSAggregator(rncptr); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); /* At = GetNewHashPos(RSSFetchUrls, 0); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At); GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); DeleteEntryFromHash(RSSFetchUrls, At); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); DeleteHashPos(&At); */ @@ -517,7 +525,7 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) */ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) { - StrBuf *CfgData; + StrBuf *CfgData=NULL; StrBuf *CfgType; StrBuf *Line; rss_room_counter *Count = NULL; @@ -531,21 +539,21 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) const char *CfgPtr, *lPtr; const char *Err; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { syslog(LOG_DEBUG, "rssclient: [%ld] %s already in progress.\n", qrbuf->QRnumber, qrbuf->QRname); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); return; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; /* Only do net processing for rooms that have netconfigs */ @@ -554,16 +562,21 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + if (fstat(fd, &statbuf) == -1) { syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", - filename, strerror(errno)); + filename, strerror(errno)); return; } - if (CtdlThreadCheckStop()) + + if (server_shutting_down) return; + CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { close(fd); FreeStrBuf(&CfgData); @@ -572,7 +585,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) return; } close(fd); - if (CtdlThreadCheckStop()) + if (server_shutting_down) return; CfgPtr = NULL; @@ -600,7 +613,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) rncptr->Url = NewStrBuf(); StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); use_this_rncptr = (rss_aggregator *)vptr; if (use_this_rncptr != NULL) @@ -626,29 +639,34 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); use_this_rncptr->roomlist_parts++; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); + + + FreeStrBuf(&rncptr->Url); + free(rncptr); + rncptr = NULL; continue; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } } } if (Count != NULL) { Count->QRnumber = qrbuf->QRnumber; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", qrbuf->QRnumber, qrbuf->QRname); Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } FreeStrBuf(&CfgData); FreeStrBuf(&CfgType); @@ -683,10 +701,11 @@ void rssclient_scan(void) { syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); it = GetNewHashPos(RSSFetchUrls, 0); - while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; if (rptr->RefCount == 0) @@ -694,7 +713,7 @@ void rssclient_scan(void) { UnlinkRSSAggregator(rptr); } DeleteHashPos(&it); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient ended\n"); doing_rssclient = 0; @@ -703,7 +722,7 @@ void rssclient_scan(void) { void rss_cleanup(void) { - citthread_mutex_destroy(&RSSQueueMutex); + /* citthread_mutex_destroy(&RSSQueueMutex); TODO */ DeleteHash(&RSSFetchUrls); DeleteHash(&RSSQueueRooms); } @@ -713,7 +732,8 @@ CTDL_MODULE_INIT(rssclient) { if (threading) { - citthread_mutex_init(&RSSQueueMutex, NULL); + CtdlFillSystemContext(&rss_CC, "rssclient"); + pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version());