finalize RSS/curlev implementation
authorWilfried Goesgens <dothebart@citadel.org>
Mon, 24 Oct 2011 22:19:58 +0000 (00:19 +0200)
committerWilfried Goesgens <dothebart@citadel.org>
Mon, 24 Oct 2011 22:19:58 +0000 (00:19 +0200)
  - properly adjust the refcount to RSS Configs, so we delete it in time...
  - split in & out handlers
  - correctly detect 30x and stop/reinit/start the evio handlers

citadel/modules/eventclient/serv_eventclient.c
citadel/modules/rssclient/rss_atom_parser.c
citadel/modules/rssclient/serv_rssclient.c

index ce2dfa79cbc2965c503158e23bb26b797f6dcbb4..aa42fe75478f2d9df080c51c60c9a5c9d03b83d1 100644 (file)
@@ -83,14 +83,14 @@ ev_async WakeupCurl;
 evcurl_global_data global;
 
 static void
-gotstatus(evcurl_global_data *global, int nnrun) 
+gotstatus(int nnrun) 
 {
         CURLM *mhnd;
         CURLMsg *msg;
         int nmsg;
 
-        global->nrun = nnrun;
-        mhnd = global->mhnd;
+        global.nrun = nnrun;
+        mhnd = global.mhnd;
 
         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
@@ -110,7 +110,8 @@ gotstatus(evcurl_global_data *global, int nnrun)
                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
                         IO = (AsyncIO *)chandle;
                         
-/////                        ev_io_stop(event_base, &IO->recv_event);
+                        ev_io_stop(event_base, &IO->recv_event);
+                        ev_io_stop(event_base, &IO->send_event);
 
                         sta = msg->data.result;
                         if (sta) {
@@ -143,33 +144,35 @@ gotstatus(evcurl_global_data *global, int nnrun)
 }
 
 static void
-stepmulti(evcurl_global_data *global, curl_socket_t fd) {
-        int nnrun;
+stepmulti(void *data, curl_socket_t fd, int which)
+{
+        int running_handles = 0;
         CURLMcode msta;
         
-        if (global == NULL) {
-            syslog(LOG_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n");
-            return;
-        }
-        msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
+        msta = curl_multi_socket_action(global.mhnd, fd, which, &running_handles);
         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
         if (msta)
                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
-        if (global->nrun != nnrun)
-                gotstatus(global, nnrun);
+        if (global.nrun != running_handles)
+                gotstatus(running_handles);
 }
 
 static void
 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
-        evcurl_global_data *global = (void *)timeev->data;
-        stepmulti(global, CURL_SOCKET_TIMEOUT);
+        stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
 }
 
 static void
-gotio(struct ev_loop *loop, ev_io *ioev, int events) {
+got_in(struct ev_loop *loop, ev_io *ioev, int events) {
         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
-        stepmulti(&global, ioev->fd);
+        stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
+}
+
+static void
+got_out(struct ev_loop *loop, ev_io *ioev, int events) {
+        syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
+        stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
 }
 
 static size_t
@@ -197,36 +200,55 @@ gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
 }
 
 static int
-gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vio) {
+gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) {
         evcurl_global_data *global = cglobal;
         CURLM *mhnd = global->mhnd;
         char *f;
-        AsyncIO *IO = (AsyncIO*) vio;
+        AsyncIO *IO = (AsyncIO*) vIO;
         CURLcode sta;
 
         syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
-
-        if (IO == NULL) {
-                syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
+       if (IO == NULL) {
                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
                 if (sta) {
                         syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
                         return -1;
                 }
                 IO = (AsyncIO *) f;
-                ev_init(&IO->recv_event, &gotio);
-                curl_multi_assign(mhnd, fd, IO);
-        }
-        if (action == CURL_POLL_REMOVE) {
+               if (IO->SendBuf.fd != 0)
+               {
+                       ev_io_stop(event_base, &IO->recv_event);
+                       ev_io_stop(event_base, &IO->send_event);
+               }
+               IO->SendBuf.fd = fd;
+               ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
+               ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
+               curl_multi_assign(mhnd, fd, IO);
+
+       }
+
+       switch (action)
+       {
+       case CURL_POLL_NONE:
+                syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
+               break;
+       case CURL_POLL_REMOVE:
                 syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
                 ev_io_stop(event_base, &IO->recv_event);
-        } else {
-                int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
+                ev_io_stop(event_base, &IO->send_event);
+               break;
+       case CURL_POLL_IN:
                 ev_io_stop(event_base, &IO->recv_event);
-                if (events) {
-                        ev_io_set(&IO->recv_event, fd, events);
-                        ev_io_start(event_base, &IO->recv_event);
-                }
+                ev_io_start(event_base, &IO->send_event);
+               break;
+       case CURL_POLL_OUT:
+                ev_io_stop(event_base, &IO->send_event);
+                ev_io_start(event_base, &IO->recv_event);
+               break;
+       case CURL_POLL_INOUT:
+                ev_io_start(event_base, &IO->send_event);
+                ev_io_start(event_base, &IO->recv_event);
+               break;
         }
         return 0;
 }
@@ -348,7 +370,7 @@ eNextState
 evcurl_handle_start(AsyncIO *IO) 
 {
        CURLMcode msta;
-       
+       IO->NextState = eConnect;
        syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
        msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
        if (msta)
index a1f9b512d2a58f998987fefe3307ff7dcb37e049..7f1d612d624dbac9ff667d7ce454fd7d2a113fec 100644 (file)
@@ -58,6 +58,8 @@
 #include "event_client.h"
 #include "rss_atom_parser.h"
 
+extern pthread_mutex_t RSSQueueMutex;
+
 HashList *StartHandlers = NULL;
 HashList *EndHandlers = NULL;
 HashList *KnownNameSpaces = NULL;
@@ -614,6 +616,9 @@ eNextState ParseRSSReply(AsyncIO *IO)
        long len;
 
        rssc = IO->Data;
+       pthread_mutex_lock(&RSSQueueMutex);
+       rssc->RefCount ++;
+       pthread_mutex_unlock(&RSSQueueMutex);
        ri = rssc->Item;
        rssc->CData = NewStrBufPlain(NULL, SIZ);
        rssc->Key = NewStrBuf();
@@ -675,6 +680,9 @@ shutdown:
 
         ///Cfg->next_poll = time(NULL) + config.c_net_freq; 
 
+       pthread_mutex_lock(&RSSQueueMutex);
+       rssc->RefCount --;
+       pthread_mutex_unlock(&RSSQueueMutex);
        return eTerminateConnection;
 }
 
index e2e9aea12ec9bf8db65213a8b787012234485dab..c125f61748aa956a38d48a3915135e1e737db01e 100644 (file)
@@ -240,6 +240,11 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf
 {
        networker_save_message *Ctx;
 
+       pthread_mutex_lock(&RSSQueueMutex);
+       Cfg->RefCount ++;
+       pthread_mutex_unlock(&RSSQueueMutex);
+
+
        Ctx = (networker_save_message *) malloc(sizeof(networker_save_message));
        memset(Ctx, 0, sizeof(networker_save_message));
        
@@ -434,7 +439,7 @@ int rss_do_fetching(rss_aggregator *Cfg)
 
        if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
                return 0;
-       Cfg->RefCount = 1;
+       Cfg->RefCount++;
 
        ri = (rss_item*) malloc(sizeof(rss_item));
        memset(ri, 0, sizeof(rss_item));