From d2f099415dcc15ed038aa148701bcd05b8d2c965 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Thu, 29 Dec 2011 12:53:36 +0100 Subject: [PATCH] Cleanup of shutdown of event contexts - centraly mark citcontexts connected to AsyncIO structs for deletion - free AsyncIO members in FreeAsyncIOContents() and make shure its called all over the place - sort stopping of contexts by type - close our DB/event - add pipes on exit - reshuffle RSS Aggregator; all parsing etc. to rss_atom_parser.c, all free/start/stop to serv_rssclient.c - remove unused stuff now discarded from the rss header - rename all instances of the rss aggregator struct to RSSAggr --- citadel/event_client.c | 20 +- citadel/modules/c-ares-dns/serv_c-ares-dns.c | 5 + .../modules/eventclient/serv_eventclient.c | 10 +- citadel/modules/rssclient/rss_atom_parser.c | 598 ++++++++++++------ citadel/modules/rssclient/rss_atom_parser.h | 4 +- citadel/modules/rssclient/serv_rssclient.c | 372 +++-------- citadel/modules/smtp/serv_smtpeventclient.c | 3 + 7 files changed, 518 insertions(+), 494 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index cdcee9f6e..cea9a24f7 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -121,9 +121,6 @@ void ShutDownDBCLient(AsyncIO *IO) assert(IO->Terminate); IO->Terminate(IO); - - Ctx->state = CON_IDLE; - Ctx->kill_me = 1; } void @@ -227,23 +224,34 @@ eNextState QueueCurlContext(AsyncIO *IO) return eSendReply; } +void DestructCAres(AsyncIO *IO); void FreeAsyncIOContents(AsyncIO *IO) { + CitContext *Ctx = IO->CitContext; + FreeStrBuf(&IO->IOBuf); FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); + + DestructCAres(IO); + + FreeURL(&IO->ConnectMe); + FreeStrBuf(&IO->HttpReq.ReplyData); + + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; } void StopClientWatchers(AsyncIO *IO) { + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); - ev_io_stop(event_base, &IO->conn_event); ev_idle_stop(event_base, &IO->unwind_stack); + ev_io_stop(event_base, &IO->conn_event); ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); - ev_timer_stop (event_base, &IO->rw_timeout); close(IO->SendBuf.fd); IO->SendBuf.fd = 0; IO->RecvBuf.fd = 0; @@ -267,8 +275,6 @@ void ShutDownCLient(AsyncIO *IO) } assert(IO->Terminate); IO->Terminate(IO); - Ctx->state = CON_IDLE; - Ctx->kill_me = 1; } diff --git a/citadel/modules/c-ares-dns/serv_c-ares-dns.c b/citadel/modules/c-ares-dns/serv_c-ares-dns.c index 642abb260..6c56fd633 100644 --- a/citadel/modules/c-ares-dns/serv_c-ares-dns.c +++ b/citadel/modules/c-ares-dns/serv_c-ares-dns.c @@ -272,6 +272,11 @@ void QueryCbDone(AsyncIO *IO) ev_idle_stop(event_base, &IO->unwind_stack); } +void DestructCAres(AsyncIO *IO) +{ + ares_destroy_options(&IO->DNS.Options); +} + void InitC_ares_dns(AsyncIO *IO) { diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 1be3a8641..00ff1f722 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -569,6 +569,8 @@ void *client_event_thread(void *arg) DeleteHash(&InboundEventQueues[1]); /* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); + close(event_add_pipe[0]); + close(event_add_pipe[1]); return(NULL); } @@ -692,6 +694,9 @@ void *db_event_thread(void *arg) DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); + + close(evdb_add_pipe[0]); + close(evdb_add_pipe[1]); /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ return(NULL); @@ -717,9 +722,8 @@ CTDL_MODULE_INIT(event_client) CtdlRegisterCleanupHook(ShutDownEventQueues); InitEventQueue(); DBInitEventQueue(); - CtdlThreadCreate(/*"Client event", */ client_event_thread); - CtdlThreadCreate(/*"DB event", */db_event_thread); -/// todo register shutdown callback. + CtdlThreadCreate(client_event_thread); + CtdlThreadCreate(db_event_thread); } return "event"; } diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index e7065bde3..dd92e2d78 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -58,28 +58,7 @@ #include "event_client.h" #include "rss_atom_parser.h" -extern pthread_mutex_t RSSQueueMutex; - -HashList *StartHandlers = NULL; -HashList *EndHandlers = NULL; -HashList *KnownNameSpaces = NULL; -void AddRSSStartHandler(rss_handler_func Handler, int Flags, const char *key, long len) -{ - rss_xml_handler *h; - h = (rss_xml_handler*) malloc(sizeof (rss_xml_handler)); - h->Flags = Flags; - h->Handler = Handler; - Put(StartHandlers, key, len, h, NULL); -} -void AddRSSEndHandler(rss_handler_func Handler, int Flags, const char *key, long len) -{ - rss_xml_handler *h; - h = (rss_xml_handler*) malloc(sizeof (rss_xml_handler)); - h->Flags = Flags; - h->Handler = Handler; - Put(EndHandlers, key, len, h, NULL); -} - +void rss_save_item(rss_item *ri, rss_aggregator *Cfg); /* @@ -132,142 +111,17 @@ void flush_rss_item(rss_item *ri) FreeStrBuf(&ri->author_email); FreeStrBuf(&ri->author_url); FreeStrBuf(&ri->description); -} - -void rss_xml_start(void *data, const char *supplied_el, const char **attr) -{ - rss_xml_handler *h; - rss_aggregator *rssc = (rss_aggregator*) data; - rss_item *ri = rssc->Item; - void *pv; - const char *pel; - char *sep = NULL; - /* Axe the namespace, we don't care about it */ -/// syslog(LOG_DEBUG, "RSS: supplied el %d: %s...\n", rssc->Cfg->ItemType, supplied_el); - pel = supplied_el; - while (sep = strchr(pel, ':'), sep) { - pel = sep + 1; - } - - if (pel != supplied_el) - { - void *v; - - if (!GetHash(KnownNameSpaces, - supplied_el, - pel - supplied_el - 1, - &v)) - { -#ifdef DEBUG_RSS - syslog(LOG_DEBUG, "RSS: START ignoring because of wrong namespace [%s]\n", - supplied_el); -#endif - return; - } - } - - StrBufPlain(rssc->Key, pel, -1); - StrBufLowerCase(rssc->Key); - if (GetHash(StartHandlers, SKEY(rssc->Key), &pv)) - { - rssc->Current = h = (rss_xml_handler*) pv; - - if (((h->Flags & RSS_UNSET) != 0) && - (rssc->ItemType == RSS_UNSET)) - { - h->Handler(rssc->CData, ri, rssc, attr); - } - else if (((h->Flags & RSS_RSS) != 0) && - (rssc->ItemType == RSS_RSS)) - { - h->Handler(rssc->CData, ri, rssc, attr); - } - else if (((h->Flags & RSS_ATOM) != 0) && - (rssc->ItemType == RSS_ATOM)) - { - h->Handler(rssc->CData, ri, rssc, attr); - } -#ifdef DEBUG_RSS - else - syslog(LOG_DEBUG, "RSS: START unhandled: [%s] [%s]...\n", pel, supplied_el); -#endif - } -#ifdef DEBUG_RSS - else - syslog(LOG_DEBUG, "RSS: START unhandled: [%s] [%s]...\n", pel, supplied_el); -#endif -} - -void rss_xml_end(void *data, const char *supplied_el) -{ - rss_xml_handler *h; - rss_aggregator *rssc = (rss_aggregator*) data; - rss_item *ri = rssc->Item; - const char *pel; - char *sep = NULL; - void *pv; - - /* Axe the namespace, we don't care about it */ - pel = supplied_el; - while (sep = strchr(pel, ':'), sep) { - pel = sep + 1; - } -// syslog(LOG_DEBUG, "RSS: END %s...\n", el); - if (pel != supplied_el) - { - void *v; - - if (!GetHash(KnownNameSpaces, - supplied_el, - pel - supplied_el - 1, - &v)) - { -#ifdef DEBUG_RSS - syslog(LOG_DEBUG, "RSS: END ignoring because of wrong namespace [%s] = [%s]\n", - supplied_el, ChrPtr(rssc->CData)); -#endif - FlushStrBuf(rssc->CData); - return; - } - } - - StrBufPlain(rssc->Key, pel, -1); - StrBufLowerCase(rssc->Key); - if (GetHash(EndHandlers, SKEY(rssc->Key), &pv)) - { - h = (rss_xml_handler*) pv; - - if (((h->Flags & RSS_UNSET) != 0) && - (rssc->ItemType == RSS_UNSET)) - { - h->Handler(rssc->CData, ri, rssc, NULL); - } - else if (((h->Flags & RSS_RSS) != 0) && - (rssc->ItemType == RSS_RSS)) - { - h->Handler(rssc->CData, ri, rssc, NULL); - } - else if (((h->Flags & RSS_ATOM) != 0) && - (rssc->ItemType == RSS_ATOM)) - { - h->Handler(rssc->CData, ri, rssc, NULL); - } -#ifdef DEBUG_RSS - else - syslog(LOG_DEBUG, "RSS: END unhandled: [%s] [%s] = [%s]...\n", pel, supplied_el, ChrPtr(rssc->CData)); -#endif - } -#ifdef DEBUG_RSS - else - syslog(LOG_DEBUG, "RSS: END unhandled: [%s] [%s] = [%s]...\n", pel, supplied_el, ChrPtr(rssc->CData)); -#endif - FlushStrBuf(rssc->CData); - rssc->Current = NULL; + FreeStrBuf(&ri->linkTitle); + FreeStrBuf(&ri->reLink); + FreeStrBuf(&ri->reLinkTitle); + FreeStrBuf(&ri->channel_title); } - +/******************************************************************************* + * XML-Handler * + *******************************************************************************/ void RSS_item_rss_start (StrBuf *CData, rss_item *ri, rss_aggregator *Cfg, const char** Attr) @@ -581,9 +435,9 @@ void RSSATOM_item_ignore(StrBuf *CData, rss_item *ri, rss_aggregator *Cfg, const */ void rss_xml_cdata_start(void *data) { - rss_aggregator *rssc = (rss_aggregator*) data; + rss_aggregator *RSSAggr = (rss_aggregator*) data; - FlushStrBuf(rssc->CData); + FlushStrBuf(RSSAggr->CData); } void rss_xml_cdata_end(void *data) @@ -591,26 +445,369 @@ void rss_xml_cdata_end(void *data) } void rss_xml_chardata(void *data, const XML_Char *s, int len) { - rss_aggregator *rssc = (rss_aggregator*) data; + rss_aggregator *RSSAggr = (rss_aggregator*) data; + + StrBufAppendBufPlain (RSSAggr->CData, s, len, 0); +} + + +/******************************************************************************* + * RSS parser logic * + *******************************************************************************/ + +extern pthread_mutex_t RSSQueueMutex; + +HashList *StartHandlers = NULL; +HashList *EndHandlers = NULL; +HashList *KnownNameSpaces = NULL; + +void FreeNetworkSaveMessage (void *vMsg) +{ + networker_save_message *Msg = (networker_save_message *) vMsg; + + CtdlFreeMessageContents(&Msg->Msg); + FreeStrBuf(&Msg->Message); + FreeStrBuf(&Msg->MsgGUID); + free(Msg); +} + - StrBufAppendBufPlain (rssc->CData, s, len, 0); +void AppendLink(StrBuf *Message, + StrBuf *link, + StrBuf *LinkTitle, + const char *Title) +{ + if (StrLength(link) > 0) + { + StrBufAppendBufPlain(Message, HKEY(""), 0); + if (StrLength(LinkTitle) > 0) + StrBufAppendBuf(Message, LinkTitle, 0); + else if ((Title != NULL) && !IsEmptyStr(Title)) + StrBufAppendBufPlain(Message, Title, -1, 0); + else + StrBufAppendBuf(Message, link, 0); + StrBufAppendBufPlain(Message, HKEY("
\n"), 0); + } } /* - * Callback function for passing libcurl's output to expat for parsing + * Commit a fetched and parsed RSS item to disk */ +void rss_save_item(rss_item *ri, rss_aggregator *Cfg) +{ + networker_save_message *SaveMsg; + struct MD5Context md5context; + u_char rawdigest[MD5_DIGEST_LEN]; + int msglen = 0; + StrBuf *Message; + StrBuf *guid; + AsyncIO *IO = &Cfg->IO; + int n; + + + SaveMsg = (networker_save_message *) malloc( + sizeof(networker_save_message)); + memset(SaveMsg, 0, sizeof(networker_save_message)); + + /* Construct a GUID to use in the S_USETABLE table. + * If one is not present in the item itself, make one up. + */ + if (ri->guid != NULL) { + StrBufSpaceToBlank(ri->guid); + StrBufTrim(ri->guid); + guid = NewStrBufPlain(HKEY("rss/")); + StrBufAppendBuf(guid, ri->guid, 0); + } + else { + MD5Init(&md5context); + if (ri->title != NULL) { + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->title)); + } + if (ri->link != NULL) { + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->link)); + } + MD5Final(rawdigest, &md5context); + guid = NewStrBufPlain(NULL, + MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); + StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN); + StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0); + } + + /* translate Item into message. */ + EVM_syslog(LOG_DEBUG, "RSS: translating item...\n"); + if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); + StrBufSpaceToBlank(ri->description); + SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC; + SaveMsg->Msg.cm_anon_type = MES_NORMAL; + SaveMsg->Msg.cm_format_type = FMT_RFC822; + + if (ri->guid != NULL) { + SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid)); + } + + if (ri->author_or_creator != NULL) { + char *From; + StrBuf *Encoded = NULL; + int FromAt; + + From = html_to_ascii(ChrPtr(ri->author_or_creator), + StrLength(ri->author_or_creator), + 512, 0); + StrBufPlain(ri->author_or_creator, From, -1); + StrBufTrim(ri->author_or_creator); + free(From); + + FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL; + if (!FromAt && StrLength (ri->author_email) > 0) + { + StrBufRFC2047encode(&Encoded, ri->author_or_creator); + SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + SmashStrBuf(&ri->author_email); + } + else + { + if (FromAt) + { + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&ri->author_or_creator); + SaveMsg->Msg.cm_fields['P'] = + strdup(SaveMsg->Msg.cm_fields['A']); + } + else + { + StrBufRFC2047encode(&Encoded, + ri->author_or_creator); + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + strdup("rss@localhost"); + + } + if (ri->pubdate <= 0) { + ri->pubdate = time(NULL); + } + } + } + else { + SaveMsg->Msg.cm_fields['A'] = strdup("rss"); + } + + SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME); + if (ri->title != NULL) { + long len; + char *Sbj; + StrBuf *Encoded, *QPEncoded; + + QPEncoded = NULL; + StrBufSpaceToBlank(ri->title); + len = StrLength(ri->title); + Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0); + len = strlen(Sbj); + if (Sbj[len - 1] == '\n') + { + len --; + Sbj[len] = '\0'; + } + Encoded = NewStrBufPlain(Sbj, len); + free(Sbj); + + StrBufTrim(Encoded); + StrBufRFC2047encode(&QPEncoded, Encoded); + + SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded); + FreeStrBuf(&Encoded); + } + SaveMsg->Msg.cm_fields['T'] = malloc(64); + snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate); + if (ri->channel_title != NULL) { + if (StrLength(ri->channel_title) > 0) { + SaveMsg->Msg.cm_fields['O'] = + strdup(ChrPtr(ri->channel_title)); + } + } + if (ri->link == NULL) + ri->link = NewStrBufPlain(HKEY("")); + +#if 0 /* temporarily disable shorter urls. */ + SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] = + GetShorterUrls(ri->description); +#endif + + msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ; + + Message = NewStrBufPlain(NULL, StrLength(ri->description)); + + StrBufPlain(Message, HKEY( + "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" + "\n")); +#if 0 /* disable shorter url for now. */ + SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); +#endif + StrBufAppendBuf(Message, ri->description, 0); + StrBufAppendBufPlain(Message, HKEY("

\n"), 0); + + AppendLink(Message, ri->link, ri->linkTitle, NULL); + AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); + StrBufAppendBufPlain(Message, HKEY("\n"), 0); + + SaveMsg->MsgGUID = guid; + SaveMsg->Message = Message; + + n = GetCount(Cfg->Messages) + 1; + Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage); +} + + +void rss_xml_start(void *data, const char *supplied_el, const char **attr) +{ + rss_xml_handler *h; + rss_aggregator *RSSAggr = (rss_aggregator*) data; + rss_item *ri = RSSAggr->Item; + void *pv; + const char *pel; + char *sep = NULL; + + /* Axe the namespace, we don't care about it */ +/// syslog(LOG_DEBUG, "RSS: supplied el %d: %s...\n", RSSAggr->Cfg->ItemType, supplied_el); + pel = supplied_el; + while (sep = strchr(pel, ':'), sep) { + pel = sep + 1; + } + + if (pel != supplied_el) + { + void *v; + + if (!GetHash(KnownNameSpaces, + supplied_el, + pel - supplied_el - 1, + &v)) + { +#ifdef DEBUG_RSS + syslog(LOG_DEBUG, "RSS: START ignoring because of wrong namespace [%s]\n", + supplied_el); +#endif + return; + } + } + + StrBufPlain(RSSAggr->Key, pel, -1); + StrBufLowerCase(RSSAggr->Key); + if (GetHash(StartHandlers, SKEY(RSSAggr->Key), &pv)) + { + h = (rss_xml_handler*) pv; + + if (((h->Flags & RSS_UNSET) != 0) && + (RSSAggr->ItemType == RSS_UNSET)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, attr); + } + else if (((h->Flags & RSS_RSS) != 0) && + (RSSAggr->ItemType == RSS_RSS)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, attr); + } + else if (((h->Flags & RSS_ATOM) != 0) && + (RSSAggr->ItemType == RSS_ATOM)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, attr); + } +#ifdef DEBUG_RSS + else + syslog(LOG_DEBUG, "RSS: START unhandled: [%s] [%s]...\n", pel, supplied_el); +#endif + } +#ifdef DEBUG_RSS + else + syslog(LOG_DEBUG, "RSS: START unhandled: [%s] [%s]...\n", pel, supplied_el); +#endif +} + +void rss_xml_end(void *data, const char *supplied_el) +{ + rss_xml_handler *h; + rss_aggregator *RSSAggr = (rss_aggregator*) data; + rss_item *ri = RSSAggr->Item; + const char *pel; + char *sep = NULL; + void *pv; + + /* Axe the namespace, we don't care about it */ + pel = supplied_el; + while (sep = strchr(pel, ':'), sep) { + pel = sep + 1; + } +// syslog(LOG_DEBUG, "RSS: END %s...\n", el); + if (pel != supplied_el) + { + void *v; + + if (!GetHash(KnownNameSpaces, + supplied_el, + pel - supplied_el - 1, + &v)) + { +#ifdef DEBUG_RSS + syslog(LOG_DEBUG, "RSS: END ignoring because of wrong namespace [%s] = [%s]\n", + supplied_el, ChrPtr(RSSAggr->CData)); +#endif + FlushStrBuf(RSSAggr->CData); + return; + } + } + + StrBufPlain(RSSAggr->Key, pel, -1); + StrBufLowerCase(RSSAggr->Key); + if (GetHash(EndHandlers, SKEY(RSSAggr->Key), &pv)) + { + h = (rss_xml_handler*) pv; + + if (((h->Flags & RSS_UNSET) != 0) && + (RSSAggr->ItemType == RSS_UNSET)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, NULL); + } + else if (((h->Flags & RSS_RSS) != 0) && + (RSSAggr->ItemType == RSS_RSS)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, NULL); + } + else if (((h->Flags & RSS_ATOM) != 0) && + (RSSAggr->ItemType == RSS_ATOM)) + { + h->Handler(RSSAggr->CData, ri, RSSAggr, NULL); + } +#ifdef DEBUG_RSS + else + syslog(LOG_DEBUG, "RSS: END unhandled: [%s] [%s] = [%s]...\n", pel, supplied_el, ChrPtr(RSSAggr->CData)); +#endif + } +#ifdef DEBUG_RSS + else + syslog(LOG_DEBUG, "RSS: END unhandled: [%s] [%s] = [%s]...\n", pel, supplied_el, ChrPtr(RSSAggr->CData)); +#endif + FlushStrBuf(RSSAggr->CData); +} + +/* + * Callback function for passing libcurl's output to expat for parsing + * we don't do streamed parsing so expat can handle non-utf8 documents size_t rss_libcurl_callback(void *ptr, size_t size, size_t nmemb, void *stream) { XML_Parse((XML_Parser)stream, ptr, (size * nmemb), 0); return (size*nmemb); } - - + */ eNextState RSSAggregator_ParseReply(AsyncIO *IO) { StrBuf *Buf; - rss_aggregator *rssc; + rss_aggregator *RSSAggr; rss_item *ri; const char *at; char *ptr; @@ -627,16 +824,16 @@ eNextState RSSAggregator_ParseReply(AsyncIO *IO) return eAbort; } - rssc = IO->Data; - ri = rssc->Item; - rssc->CData = NewStrBufPlain(NULL, SIZ); - rssc->Key = NewStrBuf(); + RSSAggr = IO->Data; + ri = RSSAggr->Item; + RSSAggr->CData = NewStrBufPlain(NULL, SIZ); + RSSAggr->Key = NewStrBuf(); at = NULL; - StrBufSipLine(rssc->Key, IO->HttpReq.ReplyData, &at); + StrBufSipLine(RSSAggr->Key, IO->HttpReq.ReplyData, &at); ptr = NULL; #define encoding "encoding=\"" - ptr = strstr(ChrPtr(rssc->Key), encoding); + ptr = strstr(ChrPtr(RSSAggr->Key), encoding); if (ptr != NULL) { char *pche; @@ -644,63 +841,82 @@ eNextState RSSAggregator_ParseReply(AsyncIO *IO) ptr += sizeof (encoding) - 1; pche = strchr(ptr, '"'); if (pche != NULL) - StrBufCutAt(rssc->Key, -1, pche); + StrBufCutAt(RSSAggr->Key, -1, pche); else ptr = "UTF-8"; } else ptr = "UTF-8"; - syslog(LOG_DEBUG, "RSS: Now parsing [%s] \n", ChrPtr(rssc->Url)); + syslog(LOG_DEBUG, "RSS: Now parsing [%s] \n", ChrPtr(RSSAggr->Url)); - rssc->xp = XML_ParserCreateNS(ptr, ':'); - if (!rssc->xp) { + RSSAggr->xp = XML_ParserCreateNS(ptr, ':'); + if (!RSSAggr->xp) { syslog(LOG_DEBUG, "Cannot create XML parser!\n"); return eAbort; } - FlushStrBuf(rssc->Key); + FlushStrBuf(RSSAggr->Key); - rssc->Messages = NewHash(1, Flathash); - XML_SetElementHandler(rssc->xp, rss_xml_start, rss_xml_end); - XML_SetCharacterDataHandler(rssc->xp, rss_xml_chardata); - XML_SetUserData(rssc->xp, rssc); - XML_SetCdataSectionHandler(rssc->xp, + RSSAggr->Messages = NewHash(1, Flathash); + XML_SetElementHandler(RSSAggr->xp, rss_xml_start, rss_xml_end); + XML_SetCharacterDataHandler(RSSAggr->xp, rss_xml_chardata); + XML_SetUserData(RSSAggr->xp, RSSAggr); + XML_SetCdataSectionHandler(RSSAggr->xp, rss_xml_cdata_start, rss_xml_cdata_end); len = StrLength(IO->HttpReq.ReplyData); ptr = SmashStrBuf(&IO->HttpReq.ReplyData); - XML_Parse(rssc->xp, ptr, len, 0); + XML_Parse(RSSAggr->xp, ptr, len, 0); free (ptr); if (ri->done_parsing == 0) - XML_Parse(rssc->xp, "", 0, 1); + XML_Parse(RSSAggr->xp, "", 0, 1); syslog(LOG_DEBUG, "RSS: XML Status [%s] \n", XML_ErrorString( - XML_GetErrorCode(rssc->xp))); + XML_GetErrorCode(RSSAggr->xp))); - XML_ParserFree(rssc->xp); + XML_ParserFree(RSSAggr->xp); flush_rss_item(ri); - FreeStrBuf(&rssc->CData); - FreeStrBuf(&rssc->Key); - Buf = NewStrBufDup(rssc->rooms); - rssc->recp.recp_room = SmashStrBuf(&Buf); - rssc->recp.num_room = rssc->roomlist_parts; - rssc->recp.recptypes_magic = RECPTYPES_MAGIC; + Buf = NewStrBufDup(RSSAggr->rooms); + RSSAggr->recp.recp_room = SmashStrBuf(&Buf); + RSSAggr->recp.num_room = RSSAggr->roomlist_parts; + RSSAggr->recp.recptypes_magic = RECPTYPES_MAGIC; - rssc->Pos = GetNewHashPos(rssc->Messages, 1); + RSSAggr->Pos = GetNewHashPos(RSSAggr->Messages, 1); ///Cfg->next_poll = time(NULL) + config.c_net_freq; - if (GetNextHashPos(rssc->Messages, rssc->Pos, &len, &Key, (void**) &rssc->ThisMsg)) + if (GetNextHashPos(RSSAggr->Messages, RSSAggr->Pos, &len, &Key, (void**) &RSSAggr->ThisMsg)) return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; } +/******************************************************************************* + * RSS handler registering logic * + *******************************************************************************/ + +void AddRSSStartHandler(rss_handler_func Handler, int Flags, const char *key, long len) +{ + rss_xml_handler *h; + h = (rss_xml_handler*) malloc(sizeof (rss_xml_handler)); + h->Flags = Flags; + h->Handler = Handler; + Put(StartHandlers, key, len, h, NULL); +} +void AddRSSEndHandler(rss_handler_func Handler, int Flags, const char *key, long len) +{ + rss_xml_handler *h; + h = (rss_xml_handler*) malloc(sizeof (rss_xml_handler)); + h->Flags = Flags; + h->Handler = Handler; + Put(EndHandlers, key, len, h, NULL); +} + void rss_parser_cleanup(void) { DeleteHash(&StartHandlers); @@ -718,7 +934,7 @@ CTDL_MODULE_INIT(rssparser) AddRSSStartHandler(RSS_item_rss_start, RSS_UNSET, HKEY("rss")); AddRSSStartHandler(RSS_item_rdf_start, RSS_UNSET, HKEY("rdf")); - AddRSSStartHandler(ATOM_item_feed_start, RSS_UNSET, HKEY("feed")); + AddRSSStartHandler(ATOM_item_feed_start, RSS_UNSET, HKEY("feed")); AddRSSStartHandler(RSS_item_item_start, RSS_RSS, HKEY("item")); AddRSSStartHandler(ATOM_item_entry_start, RSS_ATOM, HKEY("entry")); AddRSSStartHandler(ATOM_item_link_start, RSS_ATOM, HKEY("link")); diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index da02808a2..63eef81bd 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -55,6 +55,7 @@ struct rss_item { StrBuf *author_url; StrBuf *author_email; }; +void flush_rss_item(rss_item *ri); struct rss_room_counter { int count; @@ -90,13 +91,10 @@ struct rss_aggregator { HashPos *Pos; HashList *Messages; networker_save_message *ThisMsg; - const rss_xml_handler *Current; }; eNextState RSSAggregator_ParseReply(AsyncIO *IO); -void rss_save_item(rss_item *ri, rss_aggregator *Cfg); - eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO); diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 530bed03f..9f27b307f 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -73,25 +73,6 @@ eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO); struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; -void AppendLink(StrBuf *Message, - StrBuf *link, - StrBuf *LinkTitle, - const char *Title) -{ - if (StrLength(link) > 0) - { - StrBufAppendBufPlain(Message, HKEY(""), 0); - if (StrLength(LinkTitle) > 0) - StrBufAppendBuf(Message, LinkTitle, 0); - else if ((Title != NULL) && !IsEmptyStr(Title)) - StrBufAppendBufPlain(Message, Title, -1, 0); - else - StrBufAppendBuf(Message, link, 0); - StrBufAppendBufPlain(Message, HKEY("
\n"), 0); - } -} void DeleteRoomReference(long QRnumber) @@ -159,16 +140,63 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) last_run = time(NULL); } -void FreeNetworkSaveMessage (void *vMsg) + +void DeleteRssCfg(void *vptr) { - networker_save_message *Msg = (networker_save_message *) vMsg; + rss_aggregator *RSSAggr = (rss_aggregator *)vptr; + AsyncIO *IO = &RSSAggr->IO; + EVM_syslog(LOG_DEBUG, "RSS: destroying\n"); + + FreeStrBuf(&RSSAggr->Url); + FreeStrBuf(&RSSAggr->rooms); + FreeStrBuf(&RSSAggr->CData); + FreeStrBuf(&RSSAggr->Key); + DeleteHash(&RSSAggr->OtherQRnumbers); + + DeleteHashPos (&RSSAggr->Pos); + DeleteHash (&RSSAggr->Messages); + if (RSSAggr->recp.recp_room != NULL) + free(RSSAggr->recp.recp_room); + + + if (RSSAggr->Item != NULL) + { + flush_rss_item(RSSAggr->Item); + + free(RSSAggr->Item); + } - CtdlFreeMessageContents(&Msg->Msg); - FreeStrBuf(&Msg->Message); - FreeStrBuf(&Msg->MsgGUID); - free(Msg); + FreeAsyncIOContents(&RSSAggr->IO); + free(RSSAggr); } +eNextState RSSAggregator_Terminate(AsyncIO *IO) +{ + rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data; + + EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); + + + UnlinkRSSAggregator(RSSAggr); + return eAbort; +} +eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) +{ + const char *pUrl; + rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data; + + pUrl = IO->ConnectMe->PlainUrl; + if (pUrl == NULL) + pUrl = ""; + + EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl); + + + UnlinkRSSAggregator(RSSAggr); + return eAbort; +} + + eNextState AbortNetworkSaveMessage (AsyncIO *IO) { return eAbort; ///TODO @@ -178,22 +206,22 @@ eNextState RSSSaveMessage(AsyncIO *IO) { long len; const char *Key; - rss_aggregator *Ctx = (rss_aggregator *) IO->Data; + rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data; - Ctx->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); + RSSAggr->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&RSSAggr->ThisMsg->Message); - CtdlSubmitMsg(&Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); + CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), - &Ctx->ThisMsg->ut, + SKEY(RSSAggr->ThisMsg->MsgGUID), + &RSSAggr->ThisMsg->ut, sizeof(struct UseTable) ); - if (GetNextHashPos(Ctx->Messages, - Ctx->Pos, + if (GetNextHashPos(RSSAggr->Messages, + RSSAggr->Pos, &len, &Key, - (void**) &Ctx->ThisMsg)) + (void**) &RSSAggr->ThisMsg)) return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; @@ -243,176 +271,6 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) } } -/* - * Commit a fetched and parsed RSS item to disk - */ -void rss_save_item(rss_item *ri, rss_aggregator *Cfg) -{ - networker_save_message *SaveMsg; - struct MD5Context md5context; - u_char rawdigest[MD5_DIGEST_LEN]; - int msglen = 0; - StrBuf *Message; - StrBuf *guid; - AsyncIO *IO = &Cfg->IO; - int n; - - - SaveMsg = (networker_save_message *) malloc( - sizeof(networker_save_message)); - memset(SaveMsg, 0, sizeof(networker_save_message)); - - /* Construct a GUID to use in the S_USETABLE table. - * If one is not present in the item itself, make one up. - */ - if (ri->guid != NULL) { - StrBufSpaceToBlank(ri->guid); - StrBufTrim(ri->guid); - guid = NewStrBufPlain(HKEY("rss/")); - StrBufAppendBuf(guid, ri->guid, 0); - } - else { - MD5Init(&md5context); - if (ri->title != NULL) { - MD5Update(&md5context, - (const unsigned char*)SKEY(ri->title)); - } - if (ri->link != NULL) { - MD5Update(&md5context, - (const unsigned char*)SKEY(ri->link)); - } - MD5Final(rawdigest, &md5context); - guid = NewStrBufPlain(NULL, - MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); - StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN); - StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0); - } - - /* translate Item into message. */ - EVM_syslog(LOG_DEBUG, "RSS: translating item...\n"); - if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); - StrBufSpaceToBlank(ri->description); - SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC; - SaveMsg->Msg.cm_anon_type = MES_NORMAL; - SaveMsg->Msg.cm_format_type = FMT_RFC822; - - if (ri->guid != NULL) { - SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid)); - } - - if (ri->author_or_creator != NULL) { - char *From; - StrBuf *Encoded = NULL; - int FromAt; - - From = html_to_ascii(ChrPtr(ri->author_or_creator), - StrLength(ri->author_or_creator), - 512, 0); - StrBufPlain(ri->author_or_creator, From, -1); - StrBufTrim(ri->author_or_creator); - free(From); - - FromAt = strchr(ChrPtr(ri->author_or_creator), '@') != NULL; - if (!FromAt && StrLength (ri->author_email) > 0) - { - StrBufRFC2047encode(&Encoded, ri->author_or_creator); - SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded); - SaveMsg->Msg.cm_fields['P'] = - SmashStrBuf(&ri->author_email); - } - else - { - if (FromAt) - { - SaveMsg->Msg.cm_fields['A'] = - SmashStrBuf(&ri->author_or_creator); - SaveMsg->Msg.cm_fields['P'] = - strdup(SaveMsg->Msg.cm_fields['A']); - } - else - { - StrBufRFC2047encode(&Encoded, - ri->author_or_creator); - SaveMsg->Msg.cm_fields['A'] = - SmashStrBuf(&Encoded); - SaveMsg->Msg.cm_fields['P'] = - strdup("rss@localhost"); - - } - if (ri->pubdate <= 0) { - ri->pubdate = time(NULL); - } - } - } - else { - SaveMsg->Msg.cm_fields['A'] = strdup("rss"); - } - - SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME); - if (ri->title != NULL) { - long len; - char *Sbj; - StrBuf *Encoded, *QPEncoded; - - QPEncoded = NULL; - StrBufSpaceToBlank(ri->title); - len = StrLength(ri->title); - Sbj = html_to_ascii(ChrPtr(ri->title), len, 512, 0); - len = strlen(Sbj); - if (Sbj[len - 1] == '\n') - { - len --; - Sbj[len] = '\0'; - } - Encoded = NewStrBufPlain(Sbj, len); - free(Sbj); - - StrBufTrim(Encoded); - StrBufRFC2047encode(&QPEncoded, Encoded); - - SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded); - FreeStrBuf(&Encoded); - } - SaveMsg->Msg.cm_fields['T'] = malloc(64); - snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate); - if (ri->channel_title != NULL) { - if (StrLength(ri->channel_title) > 0) { - SaveMsg->Msg.cm_fields['O'] = - strdup(ChrPtr(ri->channel_title)); - } - } - if (ri->link == NULL) - ri->link = NewStrBufPlain(HKEY("")); - -#if 0 /* temporarily disable shorter urls. */ - SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] = - GetShorterUrls(ri->description); -#endif - - msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ; - - Message = NewStrBufPlain(NULL, StrLength(ri->description)); - - StrBufPlain(Message, HKEY( - "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" - "\n")); -#if 0 /* disable shorter url for now. */ - SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); -#endif - StrBufAppendBuf(Message, ri->description, 0); - StrBufAppendBufPlain(Message, HKEY("

\n"), 0); - - AppendLink(Message, ri->link, ri->linkTitle, NULL); - AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); - StrBufAppendBufPlain(Message, HKEY("\n"), 0); - - SaveMsg->MsgGUID = guid; - SaveMsg->Message = Message; - - n = GetCount(Cfg->Messages) + 1; - Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage); -} - /* @@ -455,72 +313,6 @@ int rss_do_fetching(rss_aggregator *Cfg) return 1; } - -void DeleteRssCfg(void *vptr) -{ - rss_aggregator *rncptr = (rss_aggregator *)vptr; - AsyncIO *IO = &rncptr->IO; - EVM_syslog(LOG_DEBUG, "RSS: destroying\n"); - - FreeStrBuf(&rncptr->Url); - FreeStrBuf(&rncptr->rooms); - FreeStrBuf(&rncptr->CData); - FreeStrBuf(&rncptr->Key); - FreeStrBuf(&rncptr->IO.HttpReq.ReplyData); - DeleteHash(&rncptr->OtherQRnumbers); - FreeURL(&rncptr->IO.ConnectMe); - - DeleteHashPos (&rncptr->Pos); - DeleteHash (&rncptr->Messages); - if (rncptr->recp.recp_room != NULL) - free(rncptr->recp.recp_room); - - - if (rncptr->Item != NULL) - { - FreeStrBuf(&rncptr->Item->guid); - FreeStrBuf(&rncptr->Item->title); - FreeStrBuf(&rncptr->Item->link); - FreeStrBuf(&rncptr->Item->linkTitle); - FreeStrBuf(&rncptr->Item->reLink); - FreeStrBuf(&rncptr->Item->reLinkTitle); - FreeStrBuf(&rncptr->Item->description); - FreeStrBuf(&rncptr->Item->channel_title); - FreeStrBuf(&rncptr->Item->author_or_creator); - FreeStrBuf(&rncptr->Item->author_url); - FreeStrBuf(&rncptr->Item->author_email); - - free(rncptr->Item); - } - free(rncptr); -} - -eNextState RSSAggregator_Terminate(AsyncIO *IO) -{ - rss_aggregator *rncptr = (rss_aggregator *)IO->Data; - - EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); - - - UnlinkRSSAggregator(rncptr); - return eAbort; -} -eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) -{ - const char *pUrl; - rss_aggregator *rncptr = (rss_aggregator *)IO->Data; - - pUrl = IO->ConnectMe->PlainUrl; - if (pUrl == NULL) - pUrl = ""; - - EV_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl); - - - UnlinkRSSAggregator(rncptr); - return eAbort; -} - /* * Scan a room's netconfig to determine whether it is requesting any RSS feeds */ @@ -534,8 +326,8 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) char filename[PATH_MAX]; int fd; int Done; - rss_aggregator *rncptr = NULL; - rss_aggregator *use_this_rncptr = NULL; + rss_aggregator *RSSAggr = NULL; + rss_aggregator *use_this_RSSAggr = NULL; void *vptr; const char *CfgPtr, *lPtr; const char *Err; @@ -612,49 +404,49 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) Count->count = 0; } Count->count ++; - rncptr = (rss_aggregator *) malloc(sizeof(rss_aggregator)); - memset (rncptr, 0, sizeof(rss_aggregator)); - rncptr->roomlist_parts = 1; - rncptr->Url = NewStrBuf(); - StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); + RSSAggr = (rss_aggregator *) malloc(sizeof(rss_aggregator)); + memset (RSSAggr, 0, sizeof(rss_aggregator)); + RSSAggr->roomlist_parts = 1; + RSSAggr->Url = NewStrBuf(); + StrBufExtract_NextToken(RSSAggr->Url, Line, &lPtr, '|'); pthread_mutex_lock(&RSSQueueMutex); - GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); - use_this_rncptr = (rss_aggregator *)vptr; - if (use_this_rncptr != NULL) + GetHash(RSSFetchUrls, SKEY(RSSAggr->Url), &vptr); + use_this_RSSAggr = (rss_aggregator *)vptr; + if (use_this_RSSAggr != NULL) { long *QRnumber; - StrBufAppendBufPlain(use_this_rncptr->rooms, + StrBufAppendBufPlain(use_this_RSSAggr->rooms, qrbuf->QRname, -1, 0); - if (use_this_rncptr->roomlist_parts == 1) + if (use_this_RSSAggr->roomlist_parts == 1) { - use_this_rncptr->OtherQRnumbers = + use_this_RSSAggr->OtherQRnumbers = NewHash(1, lFlathash); } QRnumber = (long*)malloc(sizeof(long)); *QRnumber = qrbuf->QRnumber; - Put(use_this_rncptr->OtherQRnumbers, + Put(use_this_RSSAggr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); - use_this_rncptr->roomlist_parts++; + use_this_RSSAggr->roomlist_parts++; pthread_mutex_unlock(&RSSQueueMutex); - FreeStrBuf(&rncptr->Url); - free(rncptr); - rncptr = NULL; + FreeStrBuf(&RSSAggr->Url); + free(RSSAggr); + RSSAggr = NULL; continue; } pthread_mutex_unlock(&RSSQueueMutex); - rncptr->ItemType = RSS_UNSET; + RSSAggr->ItemType = RSS_UNSET; - rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); + RSSAggr->rooms = NewStrBufPlain(qrbuf->QRname, -1); pthread_mutex_lock(&RSSQueueMutex); - Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); + Put(RSSFetchUrls, SKEY(RSSAggr->Url), RSSAggr, DeleteRssCfg); pthread_mutex_unlock(&RSSQueueMutex); } } diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index f3a1566bd..664dec247 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -95,6 +95,9 @@ void DeleteSmtpOutMsg(void *v) { SmtpOutMsg *Msg = v; + /* these are kept in our own space and free'd below */ + Msg->IO.ConnectMe = NULL; + ares_free_data(Msg->AllMX); if (Msg->HostLookup.VParsedDNSReply != NULL) Msg->HostLookup.DNSReplyFree(Msg->HostLookup.VParsedDNSReply); -- 2.30.2