Work on evented RSS client & libev+libcurl integration
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
index 7e5cb481dc338636f25bcb8f322db873f8f6f92b..24e3d6097f1c1c29aa5deb2001546e721a9e0dde 100644 (file)
@@ -208,7 +208,7 @@ eNextState RSSSaveMessage(AsyncIO *IO)
        cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
        
        if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
-               return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+               return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
        else
                return eAbort;
 }
@@ -229,7 +229,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 #ifndef DEBUG_RSS
        if (cdbut != NULL) {
                /* Item has already been seen */
-               syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
+               EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
                cdb_free(cdbut);
 
                /* rewrite the record anyway, to update the timestamp */
@@ -238,7 +238,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
                          &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
 
                if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
-                       return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+                       return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
                else
                        return eAbort;
        }
@@ -287,6 +287,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        int msglen = 0;
        StrBuf *Message;
        StrBuf *guid;
+       AsyncIO *IO = &Cfg->IO;
 
        int n;
    
@@ -314,7 +315,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        }
 
        /* translate Item into message. */
-       syslog(LOG_DEBUG, "RSS: translating item...\n");
+       EVM_syslog(LOG_DEBUG, "RSS: translating item...\n");
        if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY(""));
        StrBufSpaceToBlank(ri->description);
        msg = malloc(sizeof(struct CtdlMessage));
@@ -456,7 +457,6 @@ int rss_do_fetching(rss_aggregator *Cfg)
 
        if ((Cfg->next_poll != 0) && (now < Cfg->next_poll))
                return 0;
-       Cfg->RefCount++;
 
        ri = (rss_item*) malloc(sizeof(rss_item));
        memset(ri, 0, sizeof(rss_item));
@@ -489,6 +489,8 @@ int rss_do_fetching(rss_aggregator *Cfg)
 void DeleteRssCfg(void *vptr)
 {
        rss_aggregator *rncptr = (rss_aggregator *)vptr;
+       AsyncIO *IO = &rncptr->IO;
+       EVM_syslog(LOG_DEBUG, "RSS: destroying\n");
 
        FreeStrBuf(&rncptr->Url);
        FreeStrBuf(&rncptr->rooms);
@@ -498,6 +500,12 @@ void DeleteRssCfg(void *vptr)
        DeleteHash(&rncptr->OtherQRnumbers);
        FreeURL(&rncptr->IO.ConnectMe);
 
+       DeleteHashPos (&rncptr->Pos);
+       DeleteHash (&rncptr->Messages);
+       if (rncptr->recp.recp_room != NULL)
+               free(rncptr->recp.recp_room);
+
+
        if (rncptr->Item != NULL)
        {
                FreeStrBuf(&rncptr->Item->guid);
@@ -520,33 +528,11 @@ void DeleteRssCfg(void *vptr)
 eNextState RSSAggregatorTerminate(AsyncIO *IO)
 {
        rss_aggregator *rncptr = (rss_aggregator *)IO->Data;
-       
-       HashPos *At;
-       long HKLen;
-       const char *HK;
-       void *vData;
 
-       pthread_mutex_lock(&RSSQueueMutex);
-       rncptr->RefCount --;
-       if (rncptr->RefCount == 0)
-       {
-               UnlinkRSSAggregator(rncptr);
-
-       }
-       pthread_mutex_unlock(&RSSQueueMutex);
+       EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
 
-       At = GetNewHashPos(RSSFetchUrls, 0);
 
-       pthread_mutex_lock(&RSSQueueMutex);
-       GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At);
-       GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
-       DeleteEntryFromHash(RSSFetchUrls, At);
-       pthread_mutex_unlock(&RSSQueueMutex);
-       DeleteHashPos (&rncptr->Pos);
-       DeleteHash (&rncptr->Messages);
-       if (rncptr->recp.recp_room != NULL)
-               free(rncptr->recp.recp_room);
-       DeleteHashPos(&At);
+       UnlinkRSSAggregator(rncptr);
        return eAbort;
 }
 
@@ -648,29 +634,20 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
                    use_this_rncptr = (rss_aggregator *)vptr;
                    if (use_this_rncptr != NULL)
                    {
-                           /* mustn't attach to an active session */
-                           if (use_this_rncptr->RefCount > 0)
-                           {
-                                   DeleteRssCfg(rncptr);
-                                   Count->count--;
-                           }
-                           else 
+                           long *QRnumber;
+                           StrBufAppendBufPlain(use_this_rncptr->rooms, 
+                                                qrbuf->QRname, 
+                                                -1, 0);
+                           if (use_this_rncptr->roomlist_parts == 1)
                            {
-                                   long *QRnumber;
-                                   StrBufAppendBufPlain(use_this_rncptr->rooms, 
-                                                        qrbuf->QRname, 
-                                                        -1, 0);
-                                   if (use_this_rncptr->roomlist_parts == 1)
-                                   {
-                                           use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
-                                   }
-                                   QRnumber = (long*)malloc(sizeof(long));
-                                   *QRnumber = qrbuf->QRnumber;
-                                   Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
-                                   use_this_rncptr->roomlist_parts++;
+                                   use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash);
                            }
-                           pthread_mutex_unlock(&RSSQueueMutex);
+                           QRnumber = (long*)malloc(sizeof(long));
+                           *QRnumber = qrbuf->QRnumber;
+                           Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
+                           use_this_rncptr->roomlist_parts++;
 
+                           pthread_mutex_unlock(&RSSQueueMutex);
 
                            FreeStrBuf(&rncptr->Url);
                            free(rncptr);
@@ -727,6 +704,8 @@ void rssclient_scan(void) {
         */
        if (doing_rssclient) return;
        doing_rssclient = 1;
+       if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0))
+               return;
 
        syslog(LOG_DEBUG, "rssclient started\n");
        CtdlForEachRoom(rssclient_scan_room, NULL);
@@ -738,9 +717,8 @@ void rssclient_scan(void) {
               GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && 
               (vrptr != NULL)) {
                rptr = (rss_aggregator *)vrptr;
-               if (rptr->RefCount == 0) 
-                       if (!rss_do_fetching(rptr))
-                               UnlinkRSSAggregator(rptr);
+               if (!rss_do_fetching(rptr))
+                       UnlinkRSSAggregator(rptr);
        }
        DeleteHashPos(&it);
        pthread_mutex_unlock(&RSSQueueMutex);