From 1dd7c0f770f23bbc4f65de406deb75eb059b417f Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 3 Jul 2011 13:49:59 +0000 Subject: [PATCH] first working RSS collection with async DB-Saves - for now, no shorter URL expansion - replace handling of configs, old algorythm wasn't thread-safe - DB i/o is done in its own libev Queue in its own thread now; so we don't block network IO with database IO. --- citadel/event_client.c | 118 +++++- citadel/event_client.h | 5 +- .../modules/eventclient/serv_eventclient.c | 176 ++++++-- citadel/modules/rssclient/rss_atom_parser.c | 1 + citadel/modules/rssclient/rss_atom_parser.h | 1 + citadel/modules/rssclient/serv_rssclient.c | 381 +++++++++++++----- .../urldeshortener/serv_expand_shorter_urls.c | 4 +- 7 files changed, 518 insertions(+), 168 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 37a943a45..dbabc743a 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -67,21 +67,104 @@ #include "event_client.h" -extern citthread_mutex_t EventQueueMutex; -extern HashList *InboundEventQueue; -extern struct ev_loop *event_base; -extern ev_async AddJob; -extern ev_async ExitEventLoop; - -static void -IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) +static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) { CtdlLogPrintf(CTDL_DEBUG, "EVENT Q: %s\n", __FUNCTION__); AsyncIO *IO = watcher->data; + assert(IO->ShutdownAbort); IO->ShutdownAbort(IO); } + + +/*-------------------------------------------------------------------------------- + * Server DB IO + */ +extern int evdb_count; +extern citthread_mutex_t DBEventQueueMutex; +extern HashList *DBInboundEventQueue; +extern struct ev_loop *event_db; +extern ev_async DBAddJob; +extern ev_async DBExitEventLoop; + +int QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = CB; + ev_cleanup_init(&IO->abort_by_shutdown, + IO_abort_shutdown_callback); + IO->abort_by_shutdown.data = IO; + ev_cleanup_start(event_db, &IO->abort_by_shutdown); + + citthread_mutex_lock(&DBEventQueueMutex); + CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q\n"); + i = ++evdb_count ; + Put(DBInboundEventQueue, IKEY(i), h, NULL); + citthread_mutex_unlock(&DBEventQueueMutex); + + ev_async_send (event_db, &DBAddJob); + CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q Done.\n"); + return 0; +} + +void ShutDownDBCLient(AsyncIO *IO) +{ + CitContext *Ctx =IO->CitContext; + become_session(Ctx); + + CtdlLogPrintf(CTDL_DEBUG, "DBEVENT\n"); + ev_cleanup_stop(event_db, &IO->abort_by_shutdown); + + assert(IO->Terminate); + IO->Terminate(IO); + + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; +} + +void +DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__); + become_session(IO->CitContext); + ev_idle_stop(event_db, &IO->unwind_stack); + + assert(IO->ReadDone); + switch (IO->ReadDone(IO)) + { + case eAbort: + ShutDownDBCLient(IO); + default: + break; + } +} + +void NextDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + IO->ReadDone = CB; + ev_idle_init(&IO->unwind_stack, + DB_PerformNext); + IO->unwind_stack.data = IO; + ev_idle_start(event_db, &IO->unwind_stack); +} + +/*-------------------------------------------------------------------------------- + * Client IO + */ +extern int evbase_count; +extern citthread_mutex_t EventQueueMutex; +extern HashList *InboundEventQueue; +extern struct ev_loop *event_base; +extern ev_async AddJob; +extern ev_async ExitEventLoop; + + int QueueEventContext(AsyncIO *IO, IO_CallBack CB) { IOAddHandler *h; @@ -97,7 +180,7 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB) citthread_mutex_lock(&EventQueueMutex); CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); - i = GetCount(InboundEventQueue); + i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); citthread_mutex_unlock(&EventQueueMutex); @@ -106,9 +189,12 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB) return 0; } - int ShutDownEventQueue(void) { + citthread_mutex_lock(&DBEventQueueMutex); + ev_async_send (event_db, &DBExitEventLoop); + citthread_mutex_unlock(&DBEventQueueMutex); + citthread_mutex_lock(&EventQueueMutex); ev_async_send (EV_DEFAULT_ &ExitEventLoop); citthread_mutex_unlock(&EventQueueMutex); @@ -142,9 +228,12 @@ void StopClientWatchers(AsyncIO *IO) void ShutDownCLient(AsyncIO *IO) { + CitContext *Ctx =IO->CitContext; + become_session(Ctx); + CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->SendBuf.fd); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); if (IO->DNSChannel != NULL) { @@ -154,9 +243,9 @@ void ShutDownCLient(AsyncIO *IO) IO->DNSChannel = NULL; } assert(IO->Terminate); - become_session(IO->CitContext); IO->Terminate(IO); - + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; } @@ -244,6 +333,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); + if (!rv) printf("failed to write debug!"); fprintf(fd, "]\n"); @@ -457,7 +547,6 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r if (IO->SendBuf.fd < 0) { CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); -// freeaddrinfo(res); return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); @@ -498,7 +587,6 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r if (rc >= 0){ CtdlLogPrintf(CTDL_DEBUG, "connect() immediate success.\n"); -//// freeaddrinfo(res); set_start_callback(event_base, IO, 0); ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; diff --git a/citadel/event_client.h b/citadel/event_client.h index 23fdf68db..4a8b0f02f 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -1,3 +1,4 @@ +#define EV_COMPAT3 0 #include #include #include @@ -95,7 +96,7 @@ struct AsyncIO { /* Saving / loading a message async from / to disk */ struct CtdlMessage *AsyncMsg; - struct recptypes AsyncRcp; + struct recptypes *AsyncRcp; /* Custom data; its expected to contain AsyncIO so we can save malloc()s... */ void *Data; /* application specific data */ void *CitContext; /* Citadel Session context... */ @@ -108,6 +109,8 @@ typedef struct _IOAddHandler { void FreeAsyncIOContents(AsyncIO *IO); +void NextDBOperation(AsyncIO *IO, IO_CallBack CB); +int QueueDBOperation(AsyncIO *IO, IO_CallBack CB); int QueueEventContext(AsyncIO *IO, IO_CallBack CB); int ShutDownEventQueue(void); diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 971200e52..6f95a6cf0 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -60,24 +60,11 @@ #include "event_client.h" #include "serv_curl.h" -int event_add_pipe[2] = {-1, -1}; - -citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ -HashList *QueueEvents = NULL; - -HashList *InboundEventQueue = NULL; -HashList *InboundEventQueues[2] = { NULL, NULL }; - -struct ev_loop *event_base; - -ev_async AddJob; -ev_async ExitEventLoop; -ev_async WakeupCurl; - -extern struct ev_loop *event_base; - -void SockStateCb(void *data, int sock, int read, int write); +ev_loop *event_base; +/***************************************************************************** + * libevent / curl integration * + *****************************************************************************/ #define MOPT(s, v) \ do { \ sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ @@ -87,11 +74,6 @@ void SockStateCb(void *data, int sock, int read, int write); } \ } while (0) - - -/***************************************************************************** - * libevent / curl integration * - *****************************************************************************/ typedef struct _evcurl_global_data { int magic; CURLM *mhnd; @@ -105,6 +87,7 @@ typedef struct _sockwatcher_data ev_io ioev; } sockwatcher_data; +ev_async WakeupCurl; evcurl_global_data global; static void @@ -257,7 +240,6 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csoc void curl_init_connectionpool(void) { CURLM *mhnd ; -// global.magic = EVCURL_GLOBAL_MAGIC; ev_timer_init(&global.timeev, &gottime, 14.0, 14.0); global.timeev.data = (void *)&global; @@ -269,13 +251,6 @@ void curl_init_connectionpool(void) CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta)); exit(1); } -/* - if (!ev_default_loop(EVFLAG_AUTO)) - { - CtdlLogPrintf(CTDL_ERR,"error initializing libev\n"); - exit(2); - } -*/ mhnd = global.mhnd = curl_multi_init(); if (!mhnd) { @@ -288,8 +263,6 @@ void curl_init_connectionpool(void) MOPT(TIMERFUNCTION, &gotwatchtime); MOPT(TIMERDATA, (void *)&global); - /* well, just there to fire the sample request?*/ -/// ev_timer_start(EV_DEFAULT, &global.timeev); return; } @@ -385,14 +358,11 @@ evcurl_handle_start(AsyncIO *IO) if (msta) CtdlLogPrintf(CTDL_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta)); IO->HttpReq.attached = 1; -// ev_timer_start(EV_DEFAULT, &global.timeev); ev_async_send (event_base, &WakeupCurl); } static void WakeupCurlCallback(EV_P_ ev_async *w, int revents) { -/// evcurl_global_data *global = cglobal; - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl multi handle\n"); curl_multi_perform(&global, CURL_POLL_NONE); @@ -405,7 +375,19 @@ static void evcurl_shutdown (void) /***************************************************************************** * libevent integration * *****************************************************************************/ +/* + * client event queue plus its methods. + * this currently is the main loop (which may change in some future?) + */ +int evbase_count = 0; +int event_add_pipe[2] = {-1, -1}; +citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +HashList *QueueEvents = NULL; +HashList *InboundEventQueue = NULL; +HashList *InboundEventQueues[2] = { NULL, NULL }; +ev_async AddJob; +ev_async ExitEventLoop; static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { @@ -442,7 +424,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) static void EventExitCallback(EV_P_ ev_async *w, int revents) { - ev_unloop(event_base, EVUNLOOP_ALL); + ev_break(event_base, EVBREAK_ALL); CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n"); } @@ -475,9 +457,9 @@ void InitEventQueue(void) */ void *client_event_thread(void *arg) { - struct CitContext libevent_client_CC; + struct CitContext libev_client_CC; - CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread"); + CtdlFillSystemContext(&libev_client_CC, "LibEv Thread"); // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n"); @@ -492,11 +474,11 @@ void *client_event_thread(void *arg) curl_init_connectionpool(); - ev_loop (event_base, 0); + ev_run (event_base, 0); CtdlClearSystemContext(); - ev_default_destroy (); + ev_loop_destroy (EV_DEFAULT_UC); DeleteHash(&QueueEvents); InboundEventQueue = NULL; @@ -507,6 +489,118 @@ void *client_event_thread(void *arg) return(NULL); } +/*------------------------------------------------------------------------------*/ +/* + * DB-Queue; does async bdb operations. + * has its own set of handlers. + */ +ev_loop *event_db; +int evdb_count = 0; +int evdb_add_pipe[2] = {-1, -1}; +citthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +HashList *DBQueueEvents = NULL; +HashList *DBInboundEventQueue = NULL; +HashList *DBInboundEventQueues[2] = { NULL, NULL }; + +ev_async DBAddJob; +ev_async DBExitEventLoop; + +static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) +{ + HashList *q; + void *v; + HashPos *It; + long len; + const char *Key; + + /* get the control command... */ + citthread_mutex_lock(&DBEventQueueMutex); + + if (DBInboundEventQueues[0] == DBInboundEventQueue) { + DBInboundEventQueue = DBInboundEventQueues[1]; + q = DBInboundEventQueues[0]; + } + else { + DBInboundEventQueue = DBInboundEventQueues[0]; + q = DBInboundEventQueues[1]; + } + citthread_mutex_unlock(&DBEventQueueMutex); + + It = GetNewHashPos(q, 0); + while (GetNextHashPos(q, It, &len, &Key, &v)) + { + IOAddHandler *h = v; + h->EvAttch(h->IO); + } + DeleteHashPos(&It); + DeleteHashContent(&q); + CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q Read done.\n"); +} + + +static void DBEventExitCallback(EV_P_ ev_async *w, int revents) +{ + ev_break(event_db, EVBREAK_ALL); + + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n"); +} + + + +void DBInitEventQueue(void) +{ + struct rlimit LimitSet; + + citthread_mutex_init(&DBEventQueueMutex, NULL); + + if (pipe(evdb_add_pipe) != 0) { + CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); + abort(); + } + LimitSet.rlim_cur = 1; + LimitSet.rlim_max = 1; + setrlimit(evdb_add_pipe[1], &LimitSet); + + DBQueueEvents = NewHash(1, Flathash); + DBInboundEventQueues[0] = NewHash(1, Flathash); + DBInboundEventQueues[1] = NewHash(1, Flathash); + DBInboundEventQueue = DBInboundEventQueues[0]; +} + +/* + * this thread operates writing to the message database via libev. + * + * + */ +void *db_event_thread(void *arg) +{ + struct CitContext libev_msg_CC; + + CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); +// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + CtdlLogPrintf(CTDL_DEBUG, "client_msgev_thread() initializing\n"); + + event_db = ev_loop_new (EVFLAG_AUTO); + + ev_async_init(&DBAddJob, DBQueueEventAddCallback); + ev_async_start(event_db, &DBAddJob); + ev_async_init(&DBExitEventLoop, DBEventExitCallback); + ev_async_start(event_db, &DBExitEventLoop); + + ev_run (event_db, 0); + + + CtdlClearSystemContext(); + ev_loop_destroy (event_db); + + DeleteHash(&DBQueueEvents); + DBInboundEventQueue = NULL; + DeleteHash(&DBInboundEventQueues[0]); + DeleteHash(&DBInboundEventQueues[1]); + citthread_mutex_destroy(&DBEventQueueMutex); + + return(NULL); +} #endif @@ -516,7 +610,9 @@ CTDL_MODULE_INIT(event_client) if (!threading) { InitEventQueue(); + DBInitEventQueue(); CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL); + CtdlThreadCreate("DB event", CTDLTHREAD_BIGSTACK, db_event_thread, NULL); /// todo register shutdown callback. } #endif diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index 503de8e6c..9747fb87b 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -639,6 +639,7 @@ eNextState ParseRSSReply(AsyncIO *IO) else ptr = "UTF-8"; + CtdlLogPrintf(CTDL_ALERT, "RSS: Now parsing [%s] \n", ChrPtr(rssc->Cfg->Url)); rssc->xp = XML_ParserCreateNS(ptr, ':'); if (!rssc->xp) { diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index 2af7c9fdb..208bbc0bf 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -45,6 +45,7 @@ typedef struct _rss_item { typedef struct rssnetcfg rssnetcfg; struct rssnetcfg { + int Attached; rssnetcfg *next; StrBuf* Url; char *rooms; diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index a01a92f1c..74f1992cb 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -80,25 +80,95 @@ void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Ti StrBufAppendBufPlain(Message, HKEY("
\n"), 0); } } +typedef struct __networker_save_message { + AsyncIO IO; + struct CtdlMessage *Msg; + struct recptypes *recp; + StrBuf *MsgGUID; + StrBuf *Message; + struct UseTable ut; +} networker_save_message; + +eNextState FreeNetworkSaveMessage (AsyncIO *IO) +{ + networker_save_message *Ctx = (networker_save_message *) IO->Data; + + CtdlFreeMessage(Ctx->Msg); + free_recipients(Ctx->recp); + FreeStrBuf(&Ctx->MsgGUID); + free(Ctx); + return eAbort; +} + +eNextState AbortNetworkSaveMessage (AsyncIO *IO) +{ + return eAbort; ///TODO +} -void RSSSaveMessage(struct CtdlMessage *Msg, rss_item *ri, struct UseTable *ut) +eNextState RSSSaveMessage(AsyncIO *IO) { + networker_save_message *Ctx = (networker_save_message *) IO->Data; - CtdlSubmitMsg(msg, recp, NULL, 0); - CtdlFreeMessage(msg); + Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message); + + CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), &ut, sizeof(struct UseTable) ); - free(ut); + cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) ); + + return eTerminateConnection; } +// TODO: relink me: ExpandShortUrls(ri->description); -rss_save_msg(msg, recp) +eNextState FetchNetworkUsetableEntry(AsyncIO *IO) { + struct cdbdata *cdbut; + networker_save_message *Ctx = (networker_save_message *) IO->Data; + + /* Find out if we've already seen this item */ + strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO + Ctx->ut.ut_timestamp = time(NULL); + cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID)); +#ifndef DEBUG_RSS + if (cdbut != NULL) { + /* Item has already been seen */ + CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); + cdb_free(cdbut); + /* rewrite the record anyway, to update the timestamp */ + cdb_store(CDB_USETABLE, + SKEY(Ctx->MsgGUID), + &Ctx->ut, sizeof(struct UseTable) ); + return eTerminateConnection; + } + else +#endif + { + NextDBOperation(IO, RSSSaveMessage); + return eSendMore; + } +} +void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody) +{ + networker_save_message *Ctx; + + Ctx = (networker_save_message *) malloc(sizeof(networker_save_message)); + memset(Ctx, 0, sizeof(networker_save_message)); + + Ctx->MsgGUID = MsgGUID; + Ctx->Message = MessageBody; + Ctx->Msg = Msg; + Ctx->recp = recp; + Ctx->IO.Data = Ctx; + Ctx->IO.CitContext = CloneContext(CC); + Ctx->IO.Terminate = FreeNetworkSaveMessage; + Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; + QueueDBOperation(&Ctx->IO, FetchNetworkUsetableEntry); } + /* * Commit a fetched and parsed RSS item to disk */ @@ -107,20 +177,15 @@ void rss_save_item(rss_item *ri) struct MD5Context md5context; u_char rawdigest[MD5_DIGEST_LEN]; - int i; - char utmsgid[SIZ]; - struct cdbdata *cdbut; - struct UseTable ut; struct CtdlMessage *msg; struct recptypes *recp = NULL; int msglen = 0; StrBuf *Message; - AsyncIO *OtherIO; + StrBuf *guid; recp = (struct recptypes *) malloc(sizeof(struct recptypes)); if (recp == NULL) return; memset(recp, 0, sizeof(struct recptypes)); - memset(&ut, 0, sizeof(struct UseTable)); recp->recp_room = strdup(ri->roomlist); recp->num_room = num_tokens(ri->roomlist, '|'); recp->recptypes_magic = RECPTYPES_MAGIC; @@ -131,7 +196,8 @@ void rss_save_item(rss_item *ri) if (ri->guid != NULL) { StrBufSpaceToBlank(ri->guid); StrBufTrim(ri->guid); - snprintf(utmsgid, sizeof utmsgid, "rss/%s", ChrPtr(ri->guid)); + guid = NewStrBufPlain(HKEY("rss/")); + StrBufAppendBuf(guid, ri->guid, 0); } else { MD5Init(&md5context); @@ -142,12 +208,9 @@ void rss_save_item(rss_item *ri) MD5Update(&md5context, (const unsigned char*)ChrPtr(ri->link), StrLength(ri->link)); } MD5Final(rawdigest, &md5context); - for (i=0; ilink == NULL) ri->link = NewStrBufPlain(HKEY("")); - +#if 0 /* temporarily disable shorter urls. */ msg->cm_fields[TMP_SHORTER_URLS] = GetShorterUrls(ri->description); - - strcpy(ut->ut_msgid, utmsgid); - ut->ut_timestamp = time(NULL); +#endif msglen += 1024 + StrLength(ri->link) + StrLength(ri->description) ; @@ -247,7 +308,9 @@ void rss_save_item(rss_item *ri) StrBufPlain(Message, HKEY( "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" "\n")); +#if 0 /* disable shorter url for now. */ msg->cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); +#endif StrBufAppendBuf(Message, ri->description, 0); StrBufAppendBufPlain(Message, HKEY("

\n"), 0); @@ -255,46 +318,11 @@ void rss_save_item(rss_item *ri) AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); StrBufAppendBufPlain(Message, HKEY("\n"), 0); - - msg->cm_fields[TMP_MSGDATA] = Message; - - - OtherIO = malloc(sizeof(AsyncIO)); - memset(OtherIO, 0, sizeof(AsyncIO)); - OtherIO->AsyncMsg = msg; - OtherIO->AsyncRcp = recp; - - rss_save_msg(msg, recp); -// msg->cm_fields['M'] = SmashStrBuf(&Message); - - // TODO: reenable me ExpandShortUrls(ri->description); - -/// free_recipients(recp); + RSSQueueSaveMessage(msg, recp, guid, Message); } - - /* Find out if we've already seen this item * / - - cdbut = cdb_fetch(CDB_USETABLE, utmsgid, strlen(utmsgid)); -#ifndef DEBUG_RSS - if (cdbut != NULL) { - /* Item has already been seen * / - CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", utmsgid); - cdb_free(cdbut); - - /* rewrite the record anyway, to update the timestamp * / - strcpy(ut.ut_msgid, utmsgid); - ut.ut_timestamp = time(NULL); - cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), &ut, sizeof(struct UseTable) ); - } - else -#endif - { -*/ - - /* * Begin a feed parse */ @@ -303,15 +331,13 @@ void rss_do_fetching(rssnetcfg *Cfg) { rss_item *ri; time_t now; - - CURL *chnd; AsyncIO *IO; now = time(NULL); if ((Cfg->next_poll != 0) && (now < Cfg->next_poll)) return; - + Cfg->Attached = 1; ri = (rss_item*) malloc(sizeof(rss_item)); rssc = (rsscollection*) malloc(sizeof(rsscollection)); @@ -338,40 +364,16 @@ void rss_do_fetching(rssnetcfg *Cfg) { CtdlLogPrintf(CTDL_ALERT, "Unable to initialize libcurl.\n"); // goto abort; } - chnd = IO->HttpReq.chnd; evcurl_handle_start(IO); } - +citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ +HashList *RSSQueueRooms = NULL; +HashList *RSSFetchUrls = NULL; /* - * Scan a room's netconfig to determine whether it is requesting any RSS feeds - */ -void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) -{ - char filename[PATH_MAX]; - char buf[1024]; - char instr[32]; - FILE *fp; - char feedurl[256]; - rssnetcfg *rncptr = NULL; - rssnetcfg *use_this_rncptr = NULL; - int len = 0; - char *ptr = NULL; - - assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); - - if (CtdlThreadCheckStop()) - return; - - /* Only do net processing for rooms that have netconfigs */ - fp = fopen(filename, "r"); - if (fp == NULL) { - return; - } - while (fgets(buf, sizeof buf, fp) != NULL && !CtdlThreadCheckStop()) { buf[strlen(buf)-1] = 0; @@ -384,28 +386,27 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) /* If any other rooms have requested the same feed, then we will just add this * room to the target list for that client request. - */ + * / TODO: how do we do this best? for (rncptr=rnclist; rncptr!=NULL; rncptr=rncptr->next) { if (!strcmp(ChrPtr(rncptr->Url), feedurl)) { use_this_rncptr = rncptr; } } - - /* Otherwise create a new client request */ + * / + /* Otherwise create a new client request * / if (use_this_rncptr == NULL) { rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg)); memset(rncptr, 0, sizeof(rssnetcfg)); rncptr->ItemType = RSS_UNSET; - if (rncptr != NULL) { - rncptr->next = rnclist; - rncptr->Url = NewStrBufPlain(feedurl, -1); - rncptr->rooms = NULL; - rnclist = rncptr; - use_this_rncptr = rncptr; - } + + rncptr->Url = NewStrBufPlain(feedurl, -1); + rncptr->rooms = NULL; + rnclist = rncptr; + use_this_rncptr = rncptr; + } - /* Add the room name to the request */ + /* Add the room name to the request * / if (use_this_rncptr != NULL) { if (use_this_rncptr->rooms == NULL) { rncptr->rooms = strdup(qrbuf->QRname); @@ -423,18 +424,166 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) } } + */ +typedef struct __RoomCounter { + int count; + long QRnumber; +} RoomCounter; + - fclose(fp); +void DeleteRssCfg(void *vptr) +{ + rssnetcfg *rncptr = (rssnetcfg *)vptr; + + FreeStrBuf(&rncptr->Url); + if (rncptr->rooms != NULL) free(rncptr->rooms); + free(rncptr); +} + + +/* + * Scan a room's netconfig to determine whether it is requesting any RSS feeds + */ +void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) +{ + StrBuf *CfgData; + StrBuf *CfgType; + StrBuf *Line; + RoomCounter *Count = NULL; + struct stat statbuf; + char filename[PATH_MAX]; + //char buf[1024]; + //char instr[32]; + int fd; + int Done; + //char feedurl[256]; + rssnetcfg *rncptr = NULL; + rssnetcfg *use_this_rncptr = NULL; + //int len = 0; + //char *ptr = NULL; + void *vptr; + const char *CfgPtr, *lPtr; + const char *Err; + + citthread_mutex_lock(&RSSQueueMutex); + if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) + { + //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s already in progress.\n", qrbuf->QRname); + citthread_mutex_unlock(&RSSQueueMutex); + return; + } + citthread_mutex_unlock(&RSSQueueMutex); + + assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); + + if (CtdlThreadCheckStop()) + return; + + /* Only do net processing for rooms that have netconfigs */ + fd = open(filename, 0); + if (fd <= 0) { + //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); + return; + } + if (CtdlThreadCheckStop()) + return; + if (fstat(fd, &statbuf) == -1) { + CtdlLogPrintf(CTDL_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", + filename, strerror(errno)); + return; + } + if (CtdlThreadCheckStop()) + return; + CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { + close(fd); + FreeStrBuf(&CfgData); + CtdlLogPrintf(CTDL_DEBUG, "ERROR: reading config '%s' - %s
\n", + filename, strerror(errno)); + return; + } + close(fd); + if (CtdlThreadCheckStop()) + return; + + CfgPtr = NULL; + CfgType = NewStrBuf(); + Line = NewStrBufPlain(NULL, StrLength(CfgData)); + Done = 0; + while (!Done) + { + Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0; + if (StrLength(Line) > 0) + { + lPtr = NULL; + StrBufExtract_NextToken(CfgType, Line, &lPtr, '|'); + if (!strcmp("rssclient", ChrPtr(CfgType))) + { + if (Count == NULL) + { + Count = malloc(sizeof(RoomCounter)); + Count->count = 0; + } + Count->count ++; + rncptr = (rssnetcfg *) malloc(sizeof(rssnetcfg)); + memset (rncptr, 0, sizeof(rssnetcfg)); + rncptr->Url = NewStrBuf(); + StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); + + citthread_mutex_lock(&RSSQueueMutex); + GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); + use_this_rncptr = (rssnetcfg *)vptr; + citthread_mutex_unlock(&RSSQueueMutex); + + if (use_this_rncptr != NULL) + { + /* mustn't attach to an active session */ + if (use_this_rncptr->Attached == 1) + { + DeleteRssCfg(rncptr); + } + else + { + /* TODO: hook us into the otherone here. */ + } + + continue; + } + + rncptr->ItemType = RSS_UNSET; + + rncptr->rooms = NULL; + rncptr->rooms = strdup(qrbuf->QRname); + + citthread_mutex_lock(&RSSQueueMutex); + Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); + citthread_mutex_unlock(&RSSQueueMutex); + } + } + } + if (Count != NULL) + { + Count->QRnumber = qrbuf->QRnumber; + citthread_mutex_lock(&RSSQueueMutex); + Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); + citthread_mutex_unlock(&RSSQueueMutex); + } + FreeStrBuf(&CfgData); + FreeStrBuf(&CfgType); + FreeStrBuf(&Line); } /* * Scan for rooms that have RSS client requests configured */ void rssclient_scan(void) { - static time_t last_run = 0L; static int doing_rssclient = 0; rssnetcfg *rptr = NULL; + void *vrptr = NULL; + HashPos *it; + long len; + const char *Key; /* * This is a simple concurrency check to make sure only one rssclient run @@ -448,25 +597,37 @@ void rssclient_scan(void) { CtdlLogPrintf(CTDL_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); - while (rnclist != NULL && !CtdlThreadCheckStop()) { - rss_do_fetching(rnclist); - rptr = rnclist; - rnclist = rnclist->next; - if (rptr->rooms != NULL) free(rptr->rooms); - free(rptr); + citthread_mutex_lock(&RSSQueueMutex); + + it = GetNewHashPos(RSSQueueRooms, 0); + while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + (vrptr != NULL)) { + rptr = (rssnetcfg *)vrptr; + if (!rptr->Attached) rss_do_fetching(rptr); } + DeleteHashPos(&it); + citthread_mutex_unlock(&RSSQueueMutex); - CtdlLogPrintf(CTDL_DEBUG, "rssclient ended\n"); - last_run = time(NULL); + CtdlLogPrintf(CTDL_DEBUG, "rssclientscheduler ended\n"); doing_rssclient = 0; return; } +void RSSCleanup(void) +{ + citthread_mutex_destroy(&RSSQueueMutex); + DeleteHash(&RSSFetchUrls); + DeleteHash(&RSSQueueRooms); +} + CTDL_MODULE_INIT(rssclient) { if (threading) { + citthread_mutex_init(&RSSQueueMutex, NULL); + RSSQueueRooms = NewHash(1, Flathash); + RSSFetchUrls = NewHash(1, NULL); CtdlLogPrintf(CTDL_INFO, "%s\n", curl_version()); CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER); } diff --git a/citadel/modules/urldeshortener/serv_expand_shorter_urls.c b/citadel/modules/urldeshortener/serv_expand_shorter_urls.c index ea37d4034..3c3b11a2a 100644 --- a/citadel/modules/urldeshortener/serv_expand_shorter_urls.c +++ b/citadel/modules/urldeshortener/serv_expand_shorter_urls.c @@ -191,7 +191,7 @@ int SortConstStrByPosition(const void *Item1, const void *Item2) return -1; } -HashList GetShorterUrls(StrBuf Message) +HashList *GetShorterUrls(StrBuf *Message) { HashList *pUrls; /* we just suspect URL shorteners to be inside of feeds from twitter @@ -204,7 +204,7 @@ HashList GetShorterUrls(StrBuf Message) CrawlMessageForShorterUrls(pUrls, Message); if (GetCount(pUrls) > 0) - return pURLs; + return pUrls; else return NULL; -- 2.30.2