X-Git-Url: https://code.citadel.org/?p=citadel.git;a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=530bed03fe6f689b8bb96eb6af547744f980f33c;hp=db6d37fb0f3bf31c8eb555a0ae6b888d560b2ddd;hb=c855d497545dad80942a194624c111a54cd1fdc7;hpb=7cced4381b0497cf3d99a489bbb1a4f5375ded32 diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index db6d37fb0..530bed03f 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -66,14 +66,17 @@ time_t last_run = 0L; pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ -HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ +HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/ -eNextState RSSAggregatorTerminate(AsyncIO *IO); -eNextState RSSAggregatorShutdownAbort(AsyncIO *IO); +eNextState RSSAggregator_Terminate(AsyncIO *IO); +eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO); struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; -void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title) +void AppendLink(StrBuf *Message, + StrBuf *link, + StrBuf *LinkTitle, + const char *Title) { if (StrLength(link) > 0) { @@ -117,7 +120,6 @@ void DeleteRoomReference(long QRnumber) void UnlinkRooms(rss_aggregator *Cfg) { - DeleteRoomReference(Cfg->QRnumber); if (Cfg->OtherQRnumbers != NULL) { @@ -127,15 +129,16 @@ void UnlinkRooms(rss_aggregator *Cfg) void *vData; At = GetNewHashPos(Cfg->OtherQRnumbers, 0); - while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && + while (! server_shutting_down && + GetNextHashPos(Cfg->OtherQRnumbers, + At, + &HKLen, &HK, + &vData) && (vData != NULL)) { long *lData = (long*) vData; DeleteRoomReference(*lData); } -/* - if (server_shutting_down) - break; / * TODO */ DeleteHashPos(&At); } @@ -155,37 +158,12 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) DeleteHashPos(&At); last_run = time(NULL); } -/* -eNextState FreeNetworkSaveMessage (AsyncIO *IO) -{ - networker_save_message *Ctx = (networker_save_message *) IO->Data; - - pthread_mutex_lock(&RSSQueueMutex); - Ctx->Cfg->RefCount --; - - if (Ctx->Cfg->RefCount == 0) - { - UnlinkRSSAggregator(Ctx->Cfg); - - } - pthread_mutex_unlock(&RSSQueueMutex); - CtdlFreeMessage(Ctx->Msg); - free_recipients(Ctx->recp); - FreeStrBuf(&Ctx->Message); - FreeStrBuf(&Ctx->MsgGUID); - ((struct CitContext*)IO->CitContext)->state = CON_IDLE; - ((struct CitContext*)IO->CitContext)->kill_me = 1; - free(Ctx); - last_run = time(NULL); - return eAbort; -} -*/ void FreeNetworkSaveMessage (void *vMsg) { networker_save_message *Msg = (networker_save_message *) vMsg; - CtdlFreeMessage(Msg->Msg); + CtdlFreeMessageContents(&Msg->Msg); FreeStrBuf(&Msg->Message); FreeStrBuf(&Msg->MsgGUID); free(Msg); @@ -202,14 +180,20 @@ eNextState RSSSaveMessage(AsyncIO *IO) const char *Key; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; - Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); + Ctx->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); - CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); + CtdlSubmitMsg(&Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); - - if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) + cdb_store(CDB_USETABLE, + SKEY(Ctx->ThisMsg->MsgGUID), + &Ctx->ThisMsg->ut, + sizeof(struct UseTable) ); + + if (GetNextHashPos(Ctx->Messages, + Ctx->Pos, + &len, &Key, + (void**) &Ctx->ThisMsg)) return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; @@ -222,25 +206,32 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; - /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO + strcpy(Ctx->ThisMsg->ut.ut_msgid, + ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO Ctx->ThisMsg->ut.ut_timestamp = time(NULL); cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); #ifndef DEBUG_RSS if (cdbut != NULL) { /* Item has already been seen */ - EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); + EV_syslog(LOG_DEBUG, + "%s has already been seen\n", + ChrPtr(Ctx->ThisMsg->MsgGUID)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), + cdb_store(CDB_USETABLE, + SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); - if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) - return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); + if (GetNextHashPos(Ctx->Messages, + Ctx->Pos, + &len, &Key, + (void**) &Ctx->ThisMsg)) + return NextDBOperation( + IO, + RSS_FetchNetworkUsetableEntry); else return eAbort; } @@ -251,48 +242,26 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) return eSendMore; } } -/* -void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg) -{ - networker_save_message *Ctx; - - pthread_mutex_lock(&RSSQueueMutex); - Cfg->RefCount ++; - pthread_mutex_unlock(&RSSQueueMutex); - - - Ctx = (networker_save_message *) malloc(sizeof(networker_save_message)); - memset(Ctx, 0, sizeof(networker_save_message)); - - Ctx->MsgGUID = MsgGUID; - Ctx->Message = MessageBody; - Ctx->Msg = Msg; - Ctx->Cfg = Cfg; - Ctx->recp = recp; - Ctx->IO.Data = Ctx; - Ctx->IO.CitContext = CloneContext(&rss_CC); - Ctx->IO.Terminate = FreeNetworkSaveMessage; - Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; - QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); -} -*/ /* * Commit a fetched and parsed RSS item to disk */ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) { - + networker_save_message *SaveMsg; struct MD5Context md5context; u_char rawdigest[MD5_DIGEST_LEN]; - struct CtdlMessage *msg; int msglen = 0; StrBuf *Message; StrBuf *guid; AsyncIO *IO = &Cfg->IO; - int n; - + + + SaveMsg = (networker_save_message *) malloc( + sizeof(networker_save_message)); + memset(SaveMsg, 0, sizeof(networker_save_message)); + /* Construct a GUID to use in the S_USETABLE table. * If one is not present in the item itself, make one up. */ @@ -305,13 +274,16 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) else { MD5Init(&md5context); if (ri->title != NULL) { - MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title)); + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->title)); } if (ri->link != NULL) { - MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link)); + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->link)); } MD5Final(rawdigest, &md5context); - guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); + guid = NewStrBufPlain(NULL, + MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN); StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0); } @@ -320,23 +292,21 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) EVM_syslog(LOG_DEBUG, "RSS: translating item...\n"); if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); StrBufSpaceToBlank(ri->description); - msg = malloc(sizeof(struct CtdlMessage)); - memset(msg, 0, sizeof(struct CtdlMessage)); - msg->cm_magic = CTDLMESSAGE_MAGIC; - msg->cm_anon_type = MES_NORMAL; - msg->cm_format_type = FMT_RFC822; + SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC; + SaveMsg->Msg.cm_anon_type = MES_NORMAL; + SaveMsg->Msg.cm_format_type = FMT_RFC822; if (ri->guid != NULL) { - msg->cm_fields['E'] = strdup(ChrPtr(ri->guid)); + SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid)); } if (ri->author_or_creator != NULL) { char *From; StrBuf *Encoded = NULL; int FromAt; - + From = html_to_ascii(ChrPtr(ri->author_or_creator), - StrLength(ri->author_or_creator), + StrLength(ri->author_or_creator), 512, 0); StrBufPlain(ri->author_or_creator, From, -1); StrBufTrim(ri->author_or_creator); @@ -346,21 +316,27 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) if (!FromAt && StrLength (ri->author_email) > 0) { StrBufRFC2047encode(&Encoded, ri->author_or_creator); - msg->cm_fields['A'] = SmashStrBuf(&Encoded); - msg->cm_fields['P'] = SmashStrBuf(&ri->author_email); + SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + SmashStrBuf(&ri->author_email); } else { if (FromAt) { - msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator); - msg->cm_fields['P'] = strdup(msg->cm_fields['A']); + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&ri->author_or_creator); + SaveMsg->Msg.cm_fields['P'] = + strdup(SaveMsg->Msg.cm_fields['A']); } - else + else { - StrBufRFC2047encode(&Encoded, ri->author_or_creator); - msg->cm_fields['A'] = SmashStrBuf(&Encoded); - msg->cm_fields['P'] = strdup("rss@localhost"); + StrBufRFC2047encode(&Encoded, + ri->author_or_creator); + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + strdup("rss@localhost"); } if (ri->pubdate <= 0) { @@ -369,10 +345,10 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) } } else { - msg->cm_fields['A'] = strdup("rss"); + SaveMsg->Msg.cm_fields['A'] = strdup("rss"); } - msg->cm_fields['N'] = strdup(NODENAME); + SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME); if (ri->title != NULL) { long len; char *Sbj; @@ -394,21 +370,23 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) StrBufTrim(Encoded); StrBufRFC2047encode(&QPEncoded, Encoded); - msg->cm_fields['U'] = SmashStrBuf(&QPEncoded); + SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded); FreeStrBuf(&Encoded); } - msg->cm_fields['T'] = malloc(64); - snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate); + SaveMsg->Msg.cm_fields['T'] = malloc(64); + snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate); if (ri->channel_title != NULL) { if (StrLength(ri->channel_title) > 0) { - msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title)); + SaveMsg->Msg.cm_fields['O'] = + strdup(ChrPtr(ri->channel_title)); } } - if (ri->link == NULL) + if (ri->link == NULL) ri->link = NewStrBufPlain(HKEY("")); #if 0 /* temporarily disable shorter urls. */ - msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description); + SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] = + GetShorterUrls(ri->description); #endif msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ; @@ -419,7 +397,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" "\n")); #if 0 /* disable shorter url for now. */ - msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); + SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); #endif StrBufAppendBuf(Message, ri->description, 0); StrBufAppendBufPlain(Message, HKEY("

\n"), 0); @@ -428,16 +406,8 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); StrBufAppendBufPlain(Message, HKEY("\n"), 0); - - - networker_save_message *SaveMsg; - - SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message)); - memset(SaveMsg, 0, sizeof(networker_save_message)); - SaveMsg->MsgGUID = guid; SaveMsg->Message = Message; - SaveMsg->Msg = msg; n = GetCount(Cfg->Messages) + 1; Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage); @@ -451,11 +421,9 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) int rss_do_fetching(rss_aggregator *Cfg) { rss_item *ri; - time_t now; - AsyncIO *IO; - now = time(NULL); + now = time(NULL); if ((Cfg->next_poll != 0) && (now < Cfg->next_poll)) return 0; @@ -463,31 +431,27 @@ int rss_do_fetching(rss_aggregator *Cfg) ri = (rss_item*) malloc(sizeof(rss_item)); memset(ri, 0, sizeof(rss_item)); Cfg->Item = ri; - IO = &Cfg->IO; - IO->CitContext = CloneContext(&rss_CC); - IO->Data = Cfg; - - safestrncpy(((CitContext*)IO->CitContext)->cs_host, - ChrPtr(Cfg->Url), - sizeof(((CitContext*)IO->CitContext)->cs_host)); - syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); - ParseURL(&IO->ConnectMe, Cfg->Url, 80); - CurlPrepareURL(IO->ConnectMe); - - if (! evcurl_init(IO, -// Ctx, - NULL, - "Citadel RSS Client", - ParseRSSReply, - RSSAggregatorTerminate, - RSSAggregatorShutdownAbort)) + if (! InitcURLIOStruct(&Cfg->IO, + Cfg, + "Citadel RSS Client", + RSSAggregator_ParseReply, + RSSAggregator_Terminate, + RSSAggregator_ShutdownAbort)) { - syslog(LOG_DEBUG, "Unable to initialize libcurl.\n"); + syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); return 0; } - QueueCurlContext(IO); + safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host, + ChrPtr(Cfg->Url), + sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host)); + + syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); + ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80); + CurlPrepareURL(Cfg->IO.ConnectMe); + + QueueCurlContext(&Cfg->IO); return 1; } @@ -531,7 +495,7 @@ void DeleteRssCfg(void *vptr) free(rncptr); } -eNextState RSSAggregatorTerminate(AsyncIO *IO) +eNextState RSSAggregator_Terminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; @@ -541,7 +505,7 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) UnlinkRSSAggregator(rncptr); return eAbort; } -eNextState RSSAggregatorShutdownAbort(AsyncIO *IO) +eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) { const char *pUrl; rss_aggregator *rncptr = (rss_aggregator *)IO->Data; @@ -579,10 +543,10 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { - syslog(LOG_DEBUG, - "rssclient: [%ld] %s already in progress.\n", - qrbuf->QRnumber, - qrbuf->QRname); + syslog(LOG_DEBUG, + "rssclient: [%ld] %s already in progress.\n", + qrbuf->QRnumber, + qrbuf->QRname); pthread_mutex_unlock(&RSSQueueMutex); return; } @@ -592,11 +556,13 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) if (server_shutting_down) return; - + /* Only do net processing for rooms that have netconfigs */ fd = open(filename, 0); if (fd <= 0) { - //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); + /* syslog(LOG_DEBUG, + "rssclient: %s no config.\n", + qrbuf->QRname); */ return; } @@ -604,8 +570,10 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) return; if (fstat(fd, &statbuf) == -1) { - syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", - filename, strerror(errno)); + syslog(LOG_DEBUG, + "ERROR: could not stat configfile '%s' - %s\n", + filename, + strerror(errno)); return; } @@ -624,7 +592,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) close(fd); if (server_shutting_down) return; - + CfgPtr = NULL; CfgType = NewStrBuf(); Line = NewStrBufPlain(NULL, StrLength(CfgData)); @@ -656,16 +624,20 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) if (use_this_rncptr != NULL) { long *QRnumber; - StrBufAppendBufPlain(use_this_rncptr->rooms, - qrbuf->QRname, + StrBufAppendBufPlain(use_this_rncptr->rooms, + qrbuf->QRname, -1, 0); if (use_this_rncptr->roomlist_parts == 1) { - use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash); + use_this_rncptr->OtherQRnumbers = + NewHash(1, lFlathash); } QRnumber = (long*)malloc(sizeof(long)); *QRnumber = qrbuf->QRnumber; - Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); + Put(use_this_rncptr->OtherQRnumbers, + LKEY(qrbuf->QRnumber), + QRnumber, + NULL); use_this_rncptr->roomlist_parts++; pthread_mutex_unlock(&RSSQueueMutex); @@ -678,7 +650,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; - + rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); pthread_mutex_lock(&RSSQueueMutex); @@ -718,8 +690,8 @@ void rssclient_scan(void) { } /* - * This is a simple concurrency check to make sure only one rssclient run - * is done at a time. We could do this with a mutex, but since we + * This is a simple concurrency check to make sure only one rssclient + * run is done at a time. We could do this with a mutex, but since we * don't really require extremely fine granularity here, we'll do it * with a static variable instead. */ @@ -728,6 +700,7 @@ void rssclient_scan(void) { if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0)) return; + become_session(&rss_CC); syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); @@ -735,7 +708,7 @@ void rssclient_scan(void) { it = GetNewHashPos(RSSFetchUrls, 0); while (!server_shutting_down && - GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; if (!rss_do_fetching(rptr)) @@ -767,7 +740,7 @@ CTDL_MODULE_INIT(rssclient) RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version()); CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER); - CtdlRegisterCleanupHook(rss_cleanup); + CtdlRegisterCleanupHook(rss_cleanup); } return "rssclient"; }