From c855d497545dad80942a194624c111a54cd1fdc7 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 25 Dec 2011 20:29:57 +0100 Subject: [PATCH] Straightn http client generation - split CtdlFreeMessage into CtdlFreeMessageContents for places where CtdlMessage isn't kept as independent memory - move stuff about setting function pointers over to event_client.c - move stuff from evcurl_init over to evcurl_handle_start that depend on write by the client - call InitcURLIOStruct() way up than we did with evcurl_init () --- citadel/event_client.c | 26 +- citadel/event_client.h | 14 +- .../modules/eventclient/serv_eventclient.c | 62 ++-- citadel/modules/extnotify/extnotify_main.c | 8 +- citadel/modules/extnotify/funambol65.c | 171 ++++++----- citadel/modules/rssclient/rss_atom_parser.c | 2 +- citadel/modules/rssclient/rss_atom_parser.h | 4 +- citadel/modules/rssclient/serv_rssclient.c | 283 ++++++++---------- citadel/msgbase.c | 20 +- citadel/msgbase.h | 1 + 10 files changed, 306 insertions(+), 285 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 5f418fa6a..cdcee9f6e 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -693,7 +693,6 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) ShutDownCLient(IO); default: break; - } default: break; @@ -876,3 +875,28 @@ void InitIOStruct(AsyncIO *IO, IO->IOBuf = NewStrBuf(); } + +extern int evcurl_init(AsyncIO *IO); + +int InitcURLIOStruct(AsyncIO *IO, + void *Data, + const char* Desc, + IO_CallBack SendDone, + IO_CallBack Terminate, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->ShutdownAbort = ShutdownAbort; + + strcpy(IO->HttpReq.errdesc, Desc); + + + return evcurl_init(IO); + +} diff --git a/citadel/event_client.h b/citadel/event_client.h index c949a6ce9..9c4fd2ba1 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -193,13 +193,6 @@ void InitC_ares_dns(AsyncIO *IO); syslog(LOG_ERR, "error setting option " #s " on curl handle: %s", curl_easy_strerror(sta)); \ } } while (0) -int evcurl_init(AsyncIO *IO, - void *CustomData, - const char* Desc, - IO_CallBack CallBack, - IO_CallBack Terminate, - IO_CallBack ShutdownAbort); - void InitIOStruct(AsyncIO *IO, void *Data, eNextState NextState, @@ -212,6 +205,13 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack Timeout, IO_CallBack ShutdownAbort); +int InitcURLIOStruct(AsyncIO *IO, + void *Data, + const char* Desc, + IO_CallBack SendDone, + IO_CallBack Terminate, + IO_CallBack ShutdownAbort); + eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst); diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 53cbea4bd..1be3a8641 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -326,33 +326,20 @@ void curl_init_connectionpool(void) return; } - - - -int evcurl_init(AsyncIO *IO, - void *CustomData, - const char* Desc, - IO_CallBack CallBack, - IO_CallBack Terminate, - IO_CallBack ShutdownAbort) +int evcurl_init(AsyncIO *IO) { CURLcode sta; CURL *chnd; EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); IO->HttpReq.attached = 0; - IO->SendDone = CallBack; - IO->Terminate = Terminate; - IO->ShutdownAbort = ShutdownAbort; chnd = IO->HttpReq.chnd = curl_easy_init(); if (!chnd) { EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); - return 1; + return 0; } - strcpy(IO->HttpReq.errdesc, Desc); - OPT(VERBOSE, (long)1); /* unset in production */ OPT(NOPROGRESS, 1L); @@ -382,32 +369,13 @@ int evcurl_init(AsyncIO *IO, ) { OPT(INTERFACE, config.c_ip_addr); } - /* point to a structure that points back to the perl structure and stuff */ - EV_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); - OPT(URL, IO->ConnectMe->PlainUrl); - if (StrLength(IO->ConnectMe->CurlCreds)) - { - OPT(HTTPAUTH, (long)CURLAUTH_BASIC); - OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds)); - } + #ifdef CURLOPT_HTTP_CONTENT_DECODING OPT(HTTP_CONTENT_DECODING, 1); OPT(ENCODING, ""); #endif - if (StrLength(IO->HttpReq.PostData) > 0) - { - OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData)); - OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData)); - - } - else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL)) - { - OPT(POSTFIELDS, IO->HttpReq.PlainPostData); - OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen); - } IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Connection: close"); - OPT(HTTPHEADER, IO->HttpReq.headers); return 1; } @@ -436,6 +404,30 @@ eNextState evcurl_handle_start(AsyncIO *IO) { CURLMcode msta; + CURLcode sta; + CURL *chnd; + + chnd = IO->HttpReq.chnd; + EV_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); + OPT(URL, IO->ConnectMe->PlainUrl); + if (StrLength(IO->ConnectMe->CurlCreds)) + { + OPT(HTTPAUTH, (long)CURLAUTH_BASIC); + OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds)); + } + if (StrLength(IO->HttpReq.PostData) > 0) + { + OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData)); + OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData)); + + } + else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL)) + { + OPT(POSTFIELDS, IO->HttpReq.PlainPostData); + OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen); + } + OPT(HTTPHEADER, IO->HttpReq.headers); + IO->NextState = eConnect; EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd); diff --git a/citadel/modules/extnotify/extnotify_main.c b/citadel/modules/extnotify/extnotify_main.c index 616209f3c..302932295 100644 --- a/citadel/modules/extnotify/extnotify_main.c +++ b/citadel/modules/extnotify/extnotify_main.c @@ -284,9 +284,6 @@ void process_notify(long NotifyMsgnum, void *usrdata) config.c_funambol_port, FUNAMBOL_WS); - SubC = CloneContext (CC); - SubC->session_specific_data = NULL;// (char*) DupNotifyContext(Ctx); - notify_http_server(remoteurl, file_funambol_msg, strlen(file_funambol_msg),/*GNA*/ @@ -319,8 +316,6 @@ void process_notify(long NotifyMsgnum, void *usrdata) FlushStrBuf(FileBuf); memcpy(URLBuf, ChrPtr(URL), StrLength(URL) + 1); - SubC = CloneContext (CC); - SubC->session_specific_data = NULL;// (char*) DupNotifyContext(Ctx); notify_http_server(URLBuf, ChrPtr(FileBuf), StrLength(FileBuf), @@ -371,7 +366,6 @@ void do_extnotify_queue(void) * don't really require extremely fine granularity here, we'll do it * with a static variable instead. */ - if (IsEmptyStr(config.c_pager_program) && IsEmptyStr(config.c_funambol_host)) { @@ -382,6 +376,8 @@ void do_extnotify_queue(void) if (doing_queue) return; doing_queue = 1; + become_session(&extnotify_queue_CC); + pthread_setspecific(MyConKey, (void *)&extnotify_queue_CC); /* diff --git a/citadel/modules/extnotify/funambol65.c b/citadel/modules/extnotify/funambol65.c index 2d65f20ef..bff95fbb4 100644 --- a/citadel/modules/extnotify/funambol65.c +++ b/citadel/modules/extnotify/funambol65.c @@ -52,16 +52,16 @@ eNextState ExtNotifyTerminate(AsyncIO *IO); eNextState ExtNotifyShutdownAbort(AsyncIO *IO); /* -* \brief Sends a message to the Funambol server notifying +* \brief Sends a message to the Funambol server notifying * of new mail for a user * Returns 0 if unsuccessful */ -int notify_http_server(char *remoteurl, - const char* template, long tlen, +int notify_http_server(char *remoteurl, + const char* template, long tlen, char *user, - char *msgid, - long MsgNum, - NotifyContext *Ctx) + char *msgid, + long MsgNum, + NotifyContext *Ctx) { CURLcode sta; char msgnumstr[128]; @@ -75,7 +75,17 @@ int notify_http_server(char *remoteurl, IO = (AsyncIO*) malloc(sizeof(AsyncIO)); memset(IO, 0, sizeof(AsyncIO)); - IO->CitContext = CloneContext(CC); + + if (! InitcURLIOStruct(IO, + NULL, /* we don't have personal data anymore. */ + "Citadel ExtNotify", + EvaluateResult, + ExtNotifyTerminate, + ExtNotifyShutdownAbort)) + { + syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); + goto abort; + } snprintf(msgnumstr, 128, "%ld", MsgNum); @@ -88,12 +98,17 @@ int notify_http_server(char *remoteurl, if (Ftemplate == NULL) { char buf[SIZ]; - snprintf(buf, SIZ, - "Cannot load template file %s [%s]won't send notification\r\n", - file_funambol_msg, strerror(errno)); + snprintf(buf, SIZ, + "Cannot load template file %s [%s] " + "won't send notification\r\n", + file_funambol_msg, + strerror(errno)); syslog(LOG_ERR, "%s", buf); - - CtdlAideMessage(buf, "External notifier unable to find message template!"); + // TODO: once an hour! + CtdlAideMessage( + buf, + "External notifier: " + "unable to find message template!"); goto abort; } mimetype = GuessMimeByFilename(template, tlen); @@ -102,38 +117,55 @@ int notify_http_server(char *remoteurl, memset(buf, 0, SIZ); SOAPMessage = malloc(3072); memset(SOAPMessage, 0, 3072); - + while(fgets(buf, SIZ, Ftemplate) != NULL) { strcat(SOAPMessage, buf); } fclose(Ftemplate); - + if (strlen(SOAPMessage) < 0) { char buf[SIZ]; - snprintf(buf, SIZ, - "Cannot load template file %s; won't send notification\r\n", + snprintf(buf, SIZ, + "Cannot load template file %s;" + " won't send notification\r\n", file_funambol_msg); syslog(LOG_ERR, "%s", buf); - CtdlAideMessage(buf, "External notifier unable to load message template!"); + CtdlAideMessage(buf, "External notifier: " + "unable to load message template!"); goto abort; } // Do substitutions help_subst(SOAPMessage, "^notifyuser", user); - help_subst(SOAPMessage, "^syncsource", config.c_funambol_source); + help_subst(SOAPMessage, "^syncsource", + config.c_funambol_source); help_subst(SOAPMessage, "^msgid", msgid); help_subst(SOAPMessage, "^msgnum", msgnumstr); /* pass our list of custom made headers */ contenttype=(char*) malloc(40+strlen(mimetype)); - sprintf(contenttype,"Content-Type: %s; charset=utf-8", mimetype); + sprintf(contenttype, + "Content-Type: %s; charset=utf-8", + mimetype); + + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + "SOAPAction: \"\""); + + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + contenttype); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "SOAPAction: \"\""); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, contenttype); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Accept: application/soap+xml, application/mime, multipart/related, text/*"); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Pragma: no-cache"); + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + "Accept: application/soap+xml, " + "application/mime, multipart/related, text/*"); + + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + "Pragma: no-cache"); /* Now specify the POST binary data */ IO->HttpReq.PlainPostData = SOAPMessage; @@ -144,48 +176,31 @@ int notify_http_server(char *remoteurl, help_subst(remoteurl, "^syncsource", config.c_funambol_source); help_subst(remoteurl, "^msgid", msgid); help_subst(remoteurl, "^msgnum", msgnumstr); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Accept: application/soap+xml, application/mime, multipart/related, text/*"); - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Pragma: no-cache"); + + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + "Accept: application/soap+xml, " + "application/mime, multipart/related, text/*"); + + IO->HttpReq.headers = curl_slist_append( + IO->HttpReq.headers, + "Pragma: no-cache"); } Buf = NewStrBufPlain (remoteurl, -1); ParseURL(&IO->ConnectMe, Buf, 80); FreeStrBuf(&Buf); /* TODO: this is uncool... */ CurlPrepareURL(IO->ConnectMe); - if (! evcurl_init(IO, -// Ctx, - NULL, - "Citadel ExtNotify", - EvaluateResult, - ExtNotifyTerminate, - ExtNotifyShutdownAbort)) - { - syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); - goto abort; - } + chnd = IO->HttpReq.chnd; OPT(SSL_VERIFYPEER, 0); OPT(SSL_VERIFYHOST, 0); -/* - ReplyBuf = NewStrBuf(); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, ReplyBuf); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CurlFillStrBuf_callback); -*/ - if ( - (!IsEmptyStr(config.c_ip_addr)) - && (strcmp(config.c_ip_addr, "*")) - && (strcmp(config.c_ip_addr, "::")) - && (strcmp(config.c_ip_addr, "0.0.0.0")) - ) { - OPT(INTERFACE, config.c_ip_addr); - } QueueCurlContext(IO); return 0; abort: -/// curl_slist_free_all (headers); -/// curl_easy_cleanup(curl); + if (contenttype) free(contenttype); if (SOAPMessage != NULL) free(SOAPMessage); if (buf != NULL) free(buf); @@ -200,31 +215,47 @@ eNextState EvaluateResult(AsyncIO *IO) if (IO->HttpReq.httpcode != 200) { StrBuf *ErrMsg; - syslog(LOG_ALERT, "libcurl error %ld: %s\n", - IO->HttpReq.httpcode, + syslog(LOG_ALERT, "libcurl error %ld: %s\n", + IO->HttpReq.httpcode, IO->HttpReq.errdesc); - ErrMsg = NewStrBufPlain(HKEY("Error sending your Notification\n")); - StrBufAppendPrintf(ErrMsg, "\nlibcurl error %ld: \n\t\t%s\n", - IO->HttpReq.httpcode, + ErrMsg = NewStrBufPlain( + HKEY("Error sending your Notification\n")); + StrBufAppendPrintf(ErrMsg, "\nlibcurl error %ld: \n\t\t%s\n", + IO->HttpReq.httpcode, IO->HttpReq.errdesc); - StrBufAppendBufPlain(ErrMsg, HKEY("\nWas Trying to send: \n"), 0); + + StrBufAppendBufPlain(ErrMsg, + HKEY("\nWas Trying to send: \n"), + 0); + StrBufAppendBufPlain(ErrMsg, IO->ConnectMe->PlainUrl, -1, 0); if (IO->HttpReq.PlainPostDataLen > 0) { - StrBufAppendBufPlain(ErrMsg, HKEY("\nThe Post document was: \n"), 0); - StrBufAppendBufPlain(ErrMsg, - IO->HttpReq.PlainPostData, + StrBufAppendBufPlain( + ErrMsg, + HKEY("\nThe Post document was: \n"), + 0); + StrBufAppendBufPlain(ErrMsg, + IO->HttpReq.PlainPostData, IO->HttpReq.PlainPostDataLen, 0); - StrBufAppendBufPlain(ErrMsg, HKEY("\n\n"), 0); + StrBufAppendBufPlain(ErrMsg, HKEY("\n\n"), 0); } - if (StrLength(IO->HttpReq.ReplyData) > 0) { - StrBufAppendBufPlain(ErrMsg, HKEY("\n\nThe Serverreply was: \n\n"), 0); + if (StrLength(IO->HttpReq.ReplyData) > 0) { + StrBufAppendBufPlain( + ErrMsg, + HKEY("\n\nThe Serverreply was: \n\n"), + 0); StrBufAppendBuf(ErrMsg, IO->HttpReq.ReplyData, 0); } - else - StrBufAppendBufPlain(ErrMsg, HKEY("\n\nThere was no Serverreply.\n\n"), 0); + else + StrBufAppendBufPlain( + ErrMsg, + HKEY("\n\nThere was no Serverreply.\n\n"), + 0); ///ExtNotify_PutErrorMessage(Ctx, ErrMsg); - CtdlAideMessage(ChrPtr(ErrMsg), "External notifier unable to load message template!"); + CtdlAideMessage(ChrPtr(ErrMsg), + "External notifier: " + "unable to contact notification host!"); } syslog(LOG_DEBUG, "Funambol notified\n"); @@ -241,11 +272,13 @@ eNextState EvaluateResult(AsyncIO *IO) StrBuf *ErrMsg; It = GetNewHashPos(Ctx.NotifyErrors, 0); - while (GetNextHashPos(Ctx.NotifyErrors, It, &len, &Key, &vErr) && + while (GetNextHashPos(Ctx.NotifyErrors, + It, &len, &Key, &vErr) && (vErr != NULL)) { ErrMsg = (StrBuf*) vErr; - quickie_message("Citadel", NULL, NULL, AIDEROOM, ChrPtr(ErrMsg), FMT_FIXED, - "Failed to notify external service about inbound mail"); + quickie_message("Citadel", NULL, NULL, + AIDEROOM, ChrPtr(ErrMsg), FMT_FIXED, + "Failed to notify external service about inbound mail"); } DeleteHashPos(&It); diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index 84747d5d4..e7065bde3 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -607,7 +607,7 @@ size_t rss_libcurl_callback(void *ptr, size_t size, size_t nmemb, void *stream) -eNextState ParseRSSReply(AsyncIO *IO) +eNextState RSSAggregator_ParseReply(AsyncIO *IO) { StrBuf *Buf; rss_aggregator *rssc; diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index 482c7c627..da02808a2 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -62,7 +62,7 @@ struct rss_room_counter { }; typedef struct __networker_save_message { - struct CtdlMessage *Msg; + struct CtdlMessage Msg; StrBuf *MsgGUID; StrBuf *Message; struct UseTable ut; @@ -95,7 +95,7 @@ struct rss_aggregator { -eNextState ParseRSSReply(AsyncIO *IO); +eNextState RSSAggregator_ParseReply(AsyncIO *IO); void rss_save_item(rss_item *ri, rss_aggregator *Cfg); diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index db6d37fb0..530bed03f 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -66,14 +66,17 @@ time_t last_run = 0L; pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ -HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ +HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/ -eNextState RSSAggregatorTerminate(AsyncIO *IO); -eNextState RSSAggregatorShutdownAbort(AsyncIO *IO); +eNextState RSSAggregator_Terminate(AsyncIO *IO); +eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO); struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; -void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Title) +void AppendLink(StrBuf *Message, + StrBuf *link, + StrBuf *LinkTitle, + const char *Title) { if (StrLength(link) > 0) { @@ -117,7 +120,6 @@ void DeleteRoomReference(long QRnumber) void UnlinkRooms(rss_aggregator *Cfg) { - DeleteRoomReference(Cfg->QRnumber); if (Cfg->OtherQRnumbers != NULL) { @@ -127,15 +129,16 @@ void UnlinkRooms(rss_aggregator *Cfg) void *vData; At = GetNewHashPos(Cfg->OtherQRnumbers, 0); - while (GetNextHashPos(Cfg->OtherQRnumbers, At, &HKLen, &HK, &vData) && + while (! server_shutting_down && + GetNextHashPos(Cfg->OtherQRnumbers, + At, + &HKLen, &HK, + &vData) && (vData != NULL)) { long *lData = (long*) vData; DeleteRoomReference(*lData); } -/* - if (server_shutting_down) - break; / * TODO */ DeleteHashPos(&At); } @@ -155,37 +158,12 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) DeleteHashPos(&At); last_run = time(NULL); } -/* -eNextState FreeNetworkSaveMessage (AsyncIO *IO) -{ - networker_save_message *Ctx = (networker_save_message *) IO->Data; - - pthread_mutex_lock(&RSSQueueMutex); - Ctx->Cfg->RefCount --; - - if (Ctx->Cfg->RefCount == 0) - { - UnlinkRSSAggregator(Ctx->Cfg); - - } - pthread_mutex_unlock(&RSSQueueMutex); - CtdlFreeMessage(Ctx->Msg); - free_recipients(Ctx->recp); - FreeStrBuf(&Ctx->Message); - FreeStrBuf(&Ctx->MsgGUID); - ((struct CitContext*)IO->CitContext)->state = CON_IDLE; - ((struct CitContext*)IO->CitContext)->kill_me = 1; - free(Ctx); - last_run = time(NULL); - return eAbort; -} -*/ void FreeNetworkSaveMessage (void *vMsg) { networker_save_message *Msg = (networker_save_message *) vMsg; - CtdlFreeMessage(Msg->Msg); + CtdlFreeMessageContents(&Msg->Msg); FreeStrBuf(&Msg->Message); FreeStrBuf(&Msg->MsgGUID); free(Msg); @@ -202,14 +180,20 @@ eNextState RSSSaveMessage(AsyncIO *IO) const char *Key; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; - Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); + Ctx->ThisMsg->Msg.cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); - CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); + CtdlSubmitMsg(&Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); - - if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) + cdb_store(CDB_USETABLE, + SKEY(Ctx->ThisMsg->MsgGUID), + &Ctx->ThisMsg->ut, + sizeof(struct UseTable) ); + + if (GetNextHashPos(Ctx->Messages, + Ctx->Pos, + &len, &Key, + (void**) &Ctx->ThisMsg)) return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; @@ -222,25 +206,32 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; - /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO + strcpy(Ctx->ThisMsg->ut.ut_msgid, + ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO Ctx->ThisMsg->ut.ut_timestamp = time(NULL); cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); #ifndef DEBUG_RSS if (cdbut != NULL) { /* Item has already been seen */ - EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); + EV_syslog(LOG_DEBUG, + "%s has already been seen\n", + ChrPtr(Ctx->ThisMsg->MsgGUID)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), + cdb_store(CDB_USETABLE, + SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); - if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) - return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); + if (GetNextHashPos(Ctx->Messages, + Ctx->Pos, + &len, &Key, + (void**) &Ctx->ThisMsg)) + return NextDBOperation( + IO, + RSS_FetchNetworkUsetableEntry); else return eAbort; } @@ -251,48 +242,26 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) return eSendMore; } } -/* -void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg) -{ - networker_save_message *Ctx; - - pthread_mutex_lock(&RSSQueueMutex); - Cfg->RefCount ++; - pthread_mutex_unlock(&RSSQueueMutex); - - - Ctx = (networker_save_message *) malloc(sizeof(networker_save_message)); - memset(Ctx, 0, sizeof(networker_save_message)); - - Ctx->MsgGUID = MsgGUID; - Ctx->Message = MessageBody; - Ctx->Msg = Msg; - Ctx->Cfg = Cfg; - Ctx->recp = recp; - Ctx->IO.Data = Ctx; - Ctx->IO.CitContext = CloneContext(&rss_CC); - Ctx->IO.Terminate = FreeNetworkSaveMessage; - Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; - QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); -} -*/ /* * Commit a fetched and parsed RSS item to disk */ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) { - + networker_save_message *SaveMsg; struct MD5Context md5context; u_char rawdigest[MD5_DIGEST_LEN]; - struct CtdlMessage *msg; int msglen = 0; StrBuf *Message; StrBuf *guid; AsyncIO *IO = &Cfg->IO; - int n; - + + + SaveMsg = (networker_save_message *) malloc( + sizeof(networker_save_message)); + memset(SaveMsg, 0, sizeof(networker_save_message)); + /* Construct a GUID to use in the S_USETABLE table. * If one is not present in the item itself, make one up. */ @@ -305,13 +274,16 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) else { MD5Init(&md5context); if (ri->title != NULL) { - MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->title), StrLength(ri->title)); + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->title)); } if (ri->link != NULL) { - MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link)); + MD5Update(&md5context, + (const unsigned char*)SKEY(ri->link)); } MD5Final(rawdigest, &md5context); - guid = NewStrBufPlain(NULL, MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); + guid = NewStrBufPlain(NULL, + MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN); StrBufAppendBufPlain(guid, HKEY("_rss2ctdl"), 0); } @@ -320,23 +292,21 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) EVM_syslog(LOG_DEBUG, "RSS: translating item...\n"); if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); StrBufSpaceToBlank(ri->description); - msg = malloc(sizeof(struct CtdlMessage)); - memset(msg, 0, sizeof(struct CtdlMessage)); - msg->cm_magic = CTDLMESSAGE_MAGIC; - msg->cm_anon_type = MES_NORMAL; - msg->cm_format_type = FMT_RFC822; + SaveMsg->Msg.cm_magic = CTDLMESSAGE_MAGIC; + SaveMsg->Msg.cm_anon_type = MES_NORMAL; + SaveMsg->Msg.cm_format_type = FMT_RFC822; if (ri->guid != NULL) { - msg->cm_fields['E'] = strdup(ChrPtr(ri->guid)); + SaveMsg->Msg.cm_fields['E'] = strdup(ChrPtr(ri->guid)); } if (ri->author_or_creator != NULL) { char *From; StrBuf *Encoded = NULL; int FromAt; - + From = html_to_ascii(ChrPtr(ri->author_or_creator), - StrLength(ri->author_or_creator), + StrLength(ri->author_or_creator), 512, 0); StrBufPlain(ri->author_or_creator, From, -1); StrBufTrim(ri->author_or_creator); @@ -346,21 +316,27 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) if (!FromAt && StrLength (ri->author_email) > 0) { StrBufRFC2047encode(&Encoded, ri->author_or_creator); - msg->cm_fields['A'] = SmashStrBuf(&Encoded); - msg->cm_fields['P'] = SmashStrBuf(&ri->author_email); + SaveMsg->Msg.cm_fields['A'] = SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + SmashStrBuf(&ri->author_email); } else { if (FromAt) { - msg->cm_fields['A'] = SmashStrBuf(&ri->author_or_creator); - msg->cm_fields['P'] = strdup(msg->cm_fields['A']); + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&ri->author_or_creator); + SaveMsg->Msg.cm_fields['P'] = + strdup(SaveMsg->Msg.cm_fields['A']); } - else + else { - StrBufRFC2047encode(&Encoded, ri->author_or_creator); - msg->cm_fields['A'] = SmashStrBuf(&Encoded); - msg->cm_fields['P'] = strdup("rss@localhost"); + StrBufRFC2047encode(&Encoded, + ri->author_or_creator); + SaveMsg->Msg.cm_fields['A'] = + SmashStrBuf(&Encoded); + SaveMsg->Msg.cm_fields['P'] = + strdup("rss@localhost"); } if (ri->pubdate <= 0) { @@ -369,10 +345,10 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) } } else { - msg->cm_fields['A'] = strdup("rss"); + SaveMsg->Msg.cm_fields['A'] = strdup("rss"); } - msg->cm_fields['N'] = strdup(NODENAME); + SaveMsg->Msg.cm_fields['N'] = strdup(NODENAME); if (ri->title != NULL) { long len; char *Sbj; @@ -394,21 +370,23 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) StrBufTrim(Encoded); StrBufRFC2047encode(&QPEncoded, Encoded); - msg->cm_fields['U'] = SmashStrBuf(&QPEncoded); + SaveMsg->Msg.cm_fields['U'] = SmashStrBuf(&QPEncoded); FreeStrBuf(&Encoded); } - msg->cm_fields['T'] = malloc(64); - snprintf(msg->cm_fields['T'], 64, "%ld", ri->pubdate); + SaveMsg->Msg.cm_fields['T'] = malloc(64); + snprintf(SaveMsg->Msg.cm_fields['T'], 64, "%ld", ri->pubdate); if (ri->channel_title != NULL) { if (StrLength(ri->channel_title) > 0) { - msg->cm_fields['O'] = strdup(ChrPtr(ri->channel_title)); + SaveMsg->Msg.cm_fields['O'] = + strdup(ChrPtr(ri->channel_title)); } } - if (ri->link == NULL) + if (ri->link == NULL) ri->link = NewStrBufPlain(HKEY("")); #if 0 /* temporarily disable shorter urls. */ - msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description); + SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] = + GetShorterUrls(ri->description); #endif msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ; @@ -419,7 +397,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" "\n")); #if 0 /* disable shorter url for now. */ - msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); + SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); #endif StrBufAppendBuf(Message, ri->description, 0); StrBufAppendBufPlain(Message, HKEY("

\n"), 0); @@ -428,16 +406,8 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); StrBufAppendBufPlain(Message, HKEY("\n"), 0); - - - networker_save_message *SaveMsg; - - SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message)); - memset(SaveMsg, 0, sizeof(networker_save_message)); - SaveMsg->MsgGUID = guid; SaveMsg->Message = Message; - SaveMsg->Msg = msg; n = GetCount(Cfg->Messages) + 1; Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage); @@ -451,11 +421,9 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) int rss_do_fetching(rss_aggregator *Cfg) { rss_item *ri; - time_t now; - AsyncIO *IO; - now = time(NULL); + now = time(NULL); if ((Cfg->next_poll != 0) && (now < Cfg->next_poll)) return 0; @@ -463,31 +431,27 @@ int rss_do_fetching(rss_aggregator *Cfg) ri = (rss_item*) malloc(sizeof(rss_item)); memset(ri, 0, sizeof(rss_item)); Cfg->Item = ri; - IO = &Cfg->IO; - IO->CitContext = CloneContext(&rss_CC); - IO->Data = Cfg; - - safestrncpy(((CitContext*)IO->CitContext)->cs_host, - ChrPtr(Cfg->Url), - sizeof(((CitContext*)IO->CitContext)->cs_host)); - syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); - ParseURL(&IO->ConnectMe, Cfg->Url, 80); - CurlPrepareURL(IO->ConnectMe); - - if (! evcurl_init(IO, -// Ctx, - NULL, - "Citadel RSS Client", - ParseRSSReply, - RSSAggregatorTerminate, - RSSAggregatorShutdownAbort)) + if (! InitcURLIOStruct(&Cfg->IO, + Cfg, + "Citadel RSS Client", + RSSAggregator_ParseReply, + RSSAggregator_Terminate, + RSSAggregator_ShutdownAbort)) { - syslog(LOG_DEBUG, "Unable to initialize libcurl.\n"); + syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); return 0; } - QueueCurlContext(IO); + safestrncpy(((CitContext*)Cfg->IO.CitContext)->cs_host, + ChrPtr(Cfg->Url), + sizeof(((CitContext*)Cfg->IO.CitContext)->cs_host)); + + syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(Cfg->Url)); + ParseURL(&Cfg->IO.ConnectMe, Cfg->Url, 80); + CurlPrepareURL(Cfg->IO.ConnectMe); + + QueueCurlContext(&Cfg->IO); return 1; } @@ -531,7 +495,7 @@ void DeleteRssCfg(void *vptr) free(rncptr); } -eNextState RSSAggregatorTerminate(AsyncIO *IO) +eNextState RSSAggregator_Terminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; @@ -541,7 +505,7 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) UnlinkRSSAggregator(rncptr); return eAbort; } -eNextState RSSAggregatorShutdownAbort(AsyncIO *IO) +eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) { const char *pUrl; rss_aggregator *rncptr = (rss_aggregator *)IO->Data; @@ -579,10 +543,10 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { - syslog(LOG_DEBUG, - "rssclient: [%ld] %s already in progress.\n", - qrbuf->QRnumber, - qrbuf->QRname); + syslog(LOG_DEBUG, + "rssclient: [%ld] %s already in progress.\n", + qrbuf->QRnumber, + qrbuf->QRname); pthread_mutex_unlock(&RSSQueueMutex); return; } @@ -592,11 +556,13 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) if (server_shutting_down) return; - + /* Only do net processing for rooms that have netconfigs */ fd = open(filename, 0); if (fd <= 0) { - //syslog(LOG_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); + /* syslog(LOG_DEBUG, + "rssclient: %s no config.\n", + qrbuf->QRname); */ return; } @@ -604,8 +570,10 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) return; if (fstat(fd, &statbuf) == -1) { - syslog(LOG_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", - filename, strerror(errno)); + syslog(LOG_DEBUG, + "ERROR: could not stat configfile '%s' - %s\n", + filename, + strerror(errno)); return; } @@ -624,7 +592,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) close(fd); if (server_shutting_down) return; - + CfgPtr = NULL; CfgType = NewStrBuf(); Line = NewStrBufPlain(NULL, StrLength(CfgData)); @@ -656,16 +624,20 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) if (use_this_rncptr != NULL) { long *QRnumber; - StrBufAppendBufPlain(use_this_rncptr->rooms, - qrbuf->QRname, + StrBufAppendBufPlain(use_this_rncptr->rooms, + qrbuf->QRname, -1, 0); if (use_this_rncptr->roomlist_parts == 1) { - use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash); + use_this_rncptr->OtherQRnumbers = + NewHash(1, lFlathash); } QRnumber = (long*)malloc(sizeof(long)); *QRnumber = qrbuf->QRnumber; - Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); + Put(use_this_rncptr->OtherQRnumbers, + LKEY(qrbuf->QRnumber), + QRnumber, + NULL); use_this_rncptr->roomlist_parts++; pthread_mutex_unlock(&RSSQueueMutex); @@ -678,7 +650,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; - + rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); pthread_mutex_lock(&RSSQueueMutex); @@ -718,8 +690,8 @@ void rssclient_scan(void) { } /* - * This is a simple concurrency check to make sure only one rssclient run - * is done at a time. We could do this with a mutex, but since we + * This is a simple concurrency check to make sure only one rssclient + * run is done at a time. We could do this with a mutex, but since we * don't really require extremely fine granularity here, we'll do it * with a static variable instead. */ @@ -728,6 +700,7 @@ void rssclient_scan(void) { if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0)) return; + become_session(&rss_CC); syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); @@ -735,7 +708,7 @@ void rssclient_scan(void) { it = GetNewHashPos(RSSFetchUrls, 0); while (!server_shutting_down && - GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; if (!rss_do_fetching(rptr)) @@ -767,7 +740,7 @@ CTDL_MODULE_INIT(rssclient) RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version()); CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER); - CtdlRegisterCleanupHook(rss_cleanup); + CtdlRegisterCleanupHook(rss_cleanup); } return "rssclient"; } diff --git a/citadel/msgbase.c b/citadel/msgbase.c index c583263eb..2609d1d1d 100644 --- a/citadel/msgbase.c +++ b/citadel/msgbase.c @@ -1239,26 +1239,28 @@ int is_valid_message(struct CtdlMessage *msg) { return 1; } +void CtdlFreeMessageContents(struct CtdlMessage *msg) +{ + int i; + for (i = 0; i < 256; ++i) + if (msg->cm_fields[i] != NULL) { + free(msg->cm_fields[i]); + } + + msg->cm_magic = 0; /* just in case */ +} /* * 'Destructor' for struct CtdlMessage */ void CtdlFreeMessage(struct CtdlMessage *msg) { - int i; - if (is_valid_message(msg) == 0) { if (msg != NULL) free (msg); return; } - - for (i = 0; i < 256; ++i) - if (msg->cm_fields[i] != NULL) { - free(msg->cm_fields[i]); - } - - msg->cm_magic = 0; /* just in case */ + CtdlFreeMessageContents(msg); free(msg); } diff --git a/citadel/msgbase.h b/citadel/msgbase.h index 3171fbd75..37a94c63d 100644 --- a/citadel/msgbase.h +++ b/citadel/msgbase.h @@ -140,6 +140,7 @@ void CtdlWriteObject(char *req_room, /* Room to stuff it in */ ); struct CtdlMessage *CtdlFetchMessage(long msgnum, int with_body); void CtdlFreeMessage(struct CtdlMessage *msg); +void CtdlFreeMessageContents(struct CtdlMessage *msg); void serialize_message(struct ser_ret *, struct CtdlMessage *); void dump_message(struct CtdlMessage *msg, long Siz); int is_valid_message(struct CtdlMessage *); -- 2.30.2