Work on evented RSS client & libev+libcurl integration
authorWilfried Goesgens <dothebart@citadel.org>
Thu, 3 Nov 2011 23:23:27 +0000 (00:23 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Thu, 3 Nov 2011 23:23:27 +0000 (00:23 +0100)
  - finalize simplification of when to delete what
  - add more logging

citadel/context.c
citadel/event_client.c
citadel/modules/eventclient/serv_eventclient.c
citadel/modules/rssclient/rss_atom_parser.c
citadel/modules/rssclient/rss_atom_parser.h
citadel/modules/rssclient/serv_rssclient.c

index 0b433502e4fd4c9e234c4012eb643df2ea829d7f..dd51112ccbd09fa272d159cbc9081acbd4a39ee7 100644 (file)
@@ -345,7 +345,7 @@ void RemoveContext (CitContext *con)
                c = "WTF?";
        }
        syslog(LOG_DEBUG, "RemoveContext(%s) session %d", c, con->cs_pid);
-       cit_backtrace();
+///    cit_backtrace();
 
        /* Run any cleanup routines registered by loadable modules.
         * Note: We have to "become_session()" because the cleanup functions
index e360a124a96ca7f56fda6a4c16a58c7e1026d979..94777828a5974c6f9e7229a97c765b7a8531beb3 100644 (file)
@@ -116,7 +116,7 @@ void ShutDownDBCLient(AsyncIO *IO)
        CitContext *Ctx =IO->CitContext;
        become_session(Ctx);
 
-       EVM_syslog(LOG_DEBUG, "DBEVENT\n");
+       EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
        ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
 
        assert(IO->Terminate);
@@ -154,7 +154,9 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
                break;
        case eTerminateConnection:
        case eAbort:
-           ShutDownDBCLient(IO);
+               ev_idle_stop(event_db, &IO->db_unwind_stack);
+               ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
+               ShutDownDBCLient(IO);
        }
 }
 
index 2f9c5bc856aefc0e5ce67b13f2775e0d0670007e..153f1847ec1dcd79956bf67b03a8ce6a0d4a481c 100644 (file)
@@ -60,7 +60,7 @@
 
 ev_loop *event_base;
 
-long EvIDSource = 0;
+long EvIDSource = 1;
 /*****************************************************************************
  *                   libevent / curl integration                             *
  *****************************************************************************/
@@ -223,6 +223,7 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
         char *f;
         AsyncIO *IO = (AsyncIO*) vIO;
         CURLcode sta;
+       const char *Action;
 
        if (IO == NULL) {
                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
@@ -231,6 +232,7 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
                         return -1;
                 }
                 IO = (AsyncIO *) f;
+               EV_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl);
                if (IO->SendBuf.fd != 0)
                {
                        ev_io_stop(event_base, &IO->recv_event);
@@ -241,7 +243,29 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
                ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
                curl_multi_assign(mhnd, fd, IO);
        }
-        EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
+
+       Action = "";
+       switch (action)
+       {
+       case CURL_POLL_NONE:
+                Action = "CURL_POLL_NONE";
+               break;
+       case CURL_POLL_REMOVE:
+                Action = "CURL_POLL_REMOVE";
+               break;
+       case CURL_POLL_IN:
+                Action = "CURL_POLL_IN";
+               break;
+       case CURL_POLL_OUT:
+                Action = "CURL_POLL_OUT";
+               break;
+       case CURL_POLL_INOUT:
+                Action = "CURL_POLL_INOUT";
+               break;
+        }
+
+
+        EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", (int)fd, Action, action);
 
        switch (action)
        {
@@ -329,7 +353,7 @@ int evcurl_init(AsyncIO *IO,
         OPT(NOSIGNAL, (long)1);
         OPT(FAILONERROR, (long)1);
         OPT(ENCODING, "");
-        OPT(FOLLOWLOCATION, (long)1);
+        OPT(FOLLOWLOCATION, (long)0);
         OPT(MAXREDIRS, (long)7);
         OPT(USERAGENT, CITADEL);
 
@@ -455,7 +479,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
        }
        DeleteHashPos(&It);
        DeleteHashContent(&q);
-       syslog(LOG_DEBUG, "EVENT Q Read done.\n");
+       syslog(LOG_DEBUG, "EVENT Q Add done.\n");
 }
 
 
@@ -583,7 +607,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
        }
        DeleteHashPos(&It);
        DeleteHashContent(&q);
-       syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
+       syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
 }
 
 
index d60d35ae2b0c1644ea51b4a77aa00a5afb5ab120..9bbeb8a1ffe26b4d5749f5f3a5fa9b52663eab67 100644 (file)
@@ -617,10 +617,17 @@ eNextState ParseRSSReply(AsyncIO *IO)
        long len;
        const char *Key;
 
+
+       if (IO->HttpReq.httpcode != 200)
+       {
+
+               EV_syslog(LOG_DEBUG, "need a 200, got a %ld !\n",
+                         IO->HttpReq.httpcode);
+// TODO: aide error message with rate limit
+               return eAbort;
+       }
+
        rssc = IO->Data;
-       pthread_mutex_lock(&RSSQueueMutex);
-       rssc->RefCount ++;
-       pthread_mutex_unlock(&RSSQueueMutex);
        ri = rssc->Item;
        rssc->CData = NewStrBufPlain(NULL, SIZ);
        rssc->Key = NewStrBuf();
@@ -649,10 +656,7 @@ eNextState ParseRSSReply(AsyncIO *IO)
        rssc->xp = XML_ParserCreateNS(ptr, ':');
        if (!rssc->xp) {
                syslog(LOG_DEBUG, "Cannot create XML parser!\n");
-               pthread_mutex_lock(&RSSQueueMutex);
-               rssc->RefCount --;
-               pthread_mutex_unlock(&RSSQueueMutex);
-               return eTerminateConnection;
+               return eAbort;
        }
        FlushStrBuf(rssc->Key);
 
index 9cb40219b44e62da59f72da84b7ee52ed9d30129..482c7c627c3e21b31d55e6e3988230bd439fb070 100644 (file)
@@ -72,7 +72,6 @@ struct rss_aggregator {
        AsyncIO          IO;
        XML_Parser       xp;
 
-       int              RefCount;
        int              ItemType;
        int              roomlist_parts;
 
index 7e5cb481dc338636f25bcb8f322db873f8f6f92b..24e3d6097f1c1c29aa5deb2001546e721a9e0dde 100644 (file)
@@ -208,7 +208,7 @@ eNextState RSSSaveMessage(AsyncIO *IO)
        cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
        
        if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
-               return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+               return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
        else
                return eAbort;
 }
@@ -229,7 +229,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 #ifndef DEBUG_RSS
        if (cdbut != NULL) {
                /* Item has already been seen */
-               syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
+               EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
                cdb_free(cdbut);
 
                /* rewrite the record anyway, to update the timestamp */
@@ -238,7 +238,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
                          &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
 
                if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
-                       return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+                       return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
                else
                        return eAbort;
        }
@@ -287,6 +287,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        int msglen = 0;
        StrBuf *Message;
        StrBuf *guid;
+       AsyncIO *IO = &Cfg->IO;
 
        int n;
    
@@ -314,7 +315,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        }
 
        /* translate Item into message. */
-       syslog(LOG_DEBUG, "RSS: translating item...\n");
+       EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
        if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
        StrBufSpaceToBlank(ri->description);
        msg = malloc(sizeof(struct CtdlMessage));
@@ -456,7 +457,6 @@ int rss_do_fetching(rss_aggregator *Cfg)
 
        if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
                return 0;
-       Cfg->RefCount++;
 
        ri = (rss_item*) malloc(sizeof(rss_item));
        memset(ri, 0, sizeof(rss_item));
@@ -489,6 +489,8 @@ int rss_do_fetching(rss_aggregator *Cfg)
 void DeleteRssCfg(void *vptr)
 {
        rss_aggregator *rncptr = (rss_aggregator *)vptr;
+       AsyncIO *IO = &rncptr->IO;
+       EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
 
        FreeStrBuf(&rncptr->Url);
        FreeStrBuf(&rncptr->rooms);
@@ -498,6 +500,12 @@ void DeleteRssCfg(void *vptr)
        DeleteHash(&rncptr->OtherQRnumbers);
        FreeURL(&rncptr->IO.ConnectMe);
 
+       DeleteHashPos (&rncptr->Pos);
+       DeleteHash (&rncptr->Messages);
+       if (rncptr->recp.recp_room != NULL)
+               free(rncptr->recp.recp_room);
+
+
        if (rncptr->Item != NULL)
        {
                FreeStrBuf(&rncptr->Item->guid);
@@ -520,33 +528,11 @@ void DeleteRssCfg(void *vptr)
 eNextState RSSAggregatorTerminate(AsyncIO *IO)
 {
        rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
-       
-       HashPos *At;
-       long HKLen;
-       const char *HK;
-       void *vData;
 
-       pthread_mutex_lock(&RSSQueueMutex);
-       rncptr->RefCount --;
-       if (rncptr->RefCount == 0)
-       {
-               UnlinkRSSAggregator(rncptr);
-
-       }
-       pthread_mutex_unlock(&RSSQueueMutex);
+       EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
 
-       At = GetNewHashPos(RSSFetchUrls, 0);
 
-       pthread_mutex_lock(&RSSQueueMutex);
-       GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
-       GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
-       DeleteEntryFromHash(RSSFetchUrls, At);
-       pthread_mutex_unlock(&RSSQueueMutex);
-       DeleteHashPos (&rncptr->Pos);
-       DeleteHash (&rncptr->Messages);
-       if (rncptr->recp.recp_room != NULL)
-               free(rncptr->recp.recp_room);
-       DeleteHashPos(&At);
+       UnlinkRSSAggregator(rncptr);
        return eAbort;
 }
 
@@ -648,29 +634,20 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
                    use_this_rncptr = (rss_aggregator *)vptr;
                    if (use_this_rncptr != NULL)
                    {
-                           /* mustn't attach to an active session */
-                           if (use_this_rncptr->RefCount > 0)
-                           {
-                                   DeleteRssCfg(rncptr);
-                                   Count->count--;
-                           }
-                           else 
+                           long *QRnumber;
+                           StrBufAppendBufPlain(use_this_rncptr->rooms, 
+                                                qrbuf->QRname, 
+                                                -1, 0);
+                           if (use_this_rncptr->roomlist_parts == 1)
                            {
-                                   long *QRnumber;
-                                   StrBufAppendBufPlain(use_this_rncptr->rooms, 
-                                                        qrbuf->QRname, 
-                                                        -1, 0);
-                                   if (use_this_rncptr->roomlist_parts == 1)
-                                   {
-                                           use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
-                                   }
-                                   QRnumber = (long*)malloc(sizeof(long));
-                                   *QRnumber = qrbuf->QRnumber;
-                                   Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
-                                   use_this_rncptr->roomlist_parts++;
+                                   use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
                            }
-                           pthread_mutex_unlock(&RSSQueueMutex);
+                           QRnumber = (long*)malloc(sizeof(long));
+                           *QRnumber = qrbuf->QRnumber;
+                           Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
+                           use_this_rncptr->roomlist_parts++;
 
+                           pthread_mutex_unlock(&RSSQueueMutex);
 
                            FreeStrBuf(&rncptr->Url);
                            free(rncptr);
@@ -727,6 +704,8 @@ void rssclient_scan(void) {
         */
        if (doing_rssclient) return;
        doing_rssclient = 1;
+       if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
+               return;
 
        syslog(LOG_DEBUG, "rssclient started\n");
        CtdlForEachRoom(rssclient_scan_room, NULL);
@@ -738,9 +717,8 @@ void rssclient_scan(void) {
               GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
               (vrptr != NULL)) {
                rptr = (rss_aggregator *)vrptr;
-               if (rptr->RefCount == 0) 
-                       if (!rss_do_fetching(rptr))
-                               UnlinkRSSAggregator(rptr);
+               if (!rss_do_fetching(rptr))
+                       UnlinkRSSAggregator(rptr);
        }
        DeleteHashPos(&it);
        pthread_mutex_unlock(&RSSQueueMutex);