From fd29e13821d3aa616b03d447f92cfb2de112b31d Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Tue, 25 Oct 2011 00:19:58 +0200 Subject: [PATCH] finalize RSS/curlev implementation - properly adjust the refcount to RSS Configs, so we delete it in time... - split in & out handlers - correctly detect 30x and stop/reinit/start the evio handlers --- .../modules/eventclient/serv_eventclient.c | 88 ++++++++++++------- citadel/modules/rssclient/rss_atom_parser.c | 8 ++ citadel/modules/rssclient/serv_rssclient.c | 7 +- 3 files changed, 69 insertions(+), 34 deletions(-) diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index ce2dfa79c..aa42fe754 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -83,14 +83,14 @@ ev_async WakeupCurl; evcurl_global_data global; static void -gotstatus(evcurl_global_data *global, int nnrun) +gotstatus(int nnrun) { CURLM *mhnd; CURLMsg *msg; int nmsg; - global->nrun = nnrun; - mhnd = global->mhnd; + global.nrun = nnrun; + mhnd = global.mhnd; syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n"); while ((msg = curl_multi_info_read(mhnd, &nmsg))) { @@ -110,7 +110,8 @@ gotstatus(evcurl_global_data *global, int nnrun) syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); IO = (AsyncIO *)chandle; -///// ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); sta = msg->data.result; if (sta) { @@ -143,33 +144,35 @@ gotstatus(evcurl_global_data *global, int nnrun) } static void -stepmulti(evcurl_global_data *global, curl_socket_t fd) { - int nnrun; +stepmulti(void *data, curl_socket_t fd, int which) +{ + int running_handles = 0; CURLMcode msta; - if (global == NULL) { - syslog(LOG_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n"); - return; - } - msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun); + msta = curl_multi_socket_action(global.mhnd, fd, which, &running_handles); syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n"); if (msta) syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta)); - if (global->nrun != nnrun) - gotstatus(global, nnrun); + if (global.nrun != running_handles) + gotstatus(running_handles); } static void gottime(struct ev_loop *loop, ev_timer *timeev, int events) { syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); - evcurl_global_data *global = (void *)timeev->data; - stepmulti(global, CURL_SOCKET_TIMEOUT); + stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0); } static void -gotio(struct ev_loop *loop, ev_io *ioev, int events) { +got_in(struct ev_loop *loop, ev_io *ioev, int events) { syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); - stepmulti(&global, ioev->fd); + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN); +} + +static void +got_out(struct ev_loop *loop, ev_io *ioev, int events) { + syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT); } static size_t @@ -197,36 +200,55 @@ gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) { } static int -gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vio) { +gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) { evcurl_global_data *global = cglobal; CURLM *mhnd = global->mhnd; char *f; - AsyncIO *IO = (AsyncIO*) vio; + AsyncIO *IO = (AsyncIO*) vIO; CURLcode sta; syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); - - if (IO == NULL) { - syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n"); + if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); if (sta) { syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); return -1; } IO = (AsyncIO *) f; - ev_init(&IO->recv_event, &gotio); - curl_multi_assign(mhnd, fd, IO); - } - if (action == CURL_POLL_REMOVE) { + if (IO->SendBuf.fd != 0) + { + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + } + IO->SendBuf.fd = fd; + ev_io_init(&IO->recv_event, &got_in, fd, EV_READ); + ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE); + curl_multi_assign(mhnd, fd, IO); + + } + + switch (action) + { + case CURL_POLL_NONE: + syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n"); + break; + case CURL_POLL_REMOVE: syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n"); ev_io_stop(event_base, &IO->recv_event); - } else { - int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0); + ev_io_stop(event_base, &IO->send_event); + break; + case CURL_POLL_IN: ev_io_stop(event_base, &IO->recv_event); - if (events) { - ev_io_set(&IO->recv_event, fd, events); - ev_io_start(event_base, &IO->recv_event); - } + ev_io_start(event_base, &IO->send_event); + break; + case CURL_POLL_OUT: + ev_io_stop(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); + break; + case CURL_POLL_INOUT: + ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); + break; } return 0; } @@ -348,7 +370,7 @@ eNextState evcurl_handle_start(AsyncIO *IO) { CURLMcode msta; - + IO->NextState = eConnect; syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd); if (msta) diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index a1f9b512d..7f1d612d6 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -58,6 +58,8 @@ #include "event_client.h" #include "rss_atom_parser.h" +extern pthread_mutex_t RSSQueueMutex; + HashList *StartHandlers = NULL; HashList *EndHandlers = NULL; HashList *KnownNameSpaces = NULL; @@ -614,6 +616,9 @@ eNextState ParseRSSReply(AsyncIO *IO) long len; rssc = IO->Data; + pthread_mutex_lock(&RSSQueueMutex); + rssc->RefCount ++; + pthread_mutex_unlock(&RSSQueueMutex); ri = rssc->Item; rssc->CData = NewStrBufPlain(NULL, SIZ); rssc->Key = NewStrBuf(); @@ -675,6 +680,9 @@ shutdown: ///Cfg->next_poll = time(NULL) + config.c_net_freq; + pthread_mutex_lock(&RSSQueueMutex); + rssc->RefCount --; + pthread_mutex_unlock(&RSSQueueMutex); return eTerminateConnection; } diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index e2e9aea12..c125f6174 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -240,6 +240,11 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf { networker_save_message *Ctx; + pthread_mutex_lock(&RSSQueueMutex); + Cfg->RefCount ++; + pthread_mutex_unlock(&RSSQueueMutex); + + Ctx = (networker_save_message *) malloc(sizeof(networker_save_message)); memset(Ctx, 0, sizeof(networker_save_message)); @@ -434,7 +439,7 @@ int rss_do_fetching(rss_aggregator *Cfg) if ((Cfg->next_poll != 0) && (now < Cfg->next_poll)) return 0; - Cfg->RefCount = 1; + Cfg->RefCount++; ri = (rss_item*) malloc(sizeof(rss_item)); memset(ri, 0, sizeof(rss_item)); -- 2.30.2