EV: fix possible nullpointer access in last commit.
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
index c2d95d34a88de0e68b0510524129fcfccafa16ba..d15613d4907d591f0b5fa98d376b9bcbb0533e92 100644 (file)
@@ -93,6 +93,28 @@ int RSSClientDebugEnabled = 0;
        DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,           \
                             IO->ID, N)
 
+typedef enum _RSSState {
+       eRSSCreated,
+       eRSSFetching,
+       eRSSFailure,
+       eRSSParsing,
+       eRSSUT
+} RSSState;
+ConstStr RSSStates[] = {
+       {HKEY("Aggregator created")},
+       {HKEY("Fetching content")},
+       {HKEY("Failed")},
+       {HKEY("parsing content")},
+       {HKEY("checking usetable")}
+};
+
+static void SetRSSState(AsyncIO *IO, RSSState State)
+{
+       CitContext* CCC = IO->CitContext;
+       if (CCC != NULL)
+               memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1);
+}
+
 void DeleteRoomReference(long QRnumber)
 {
        HashPos *At;
@@ -164,7 +186,9 @@ void DeleteRssCfg(void *vptr)
 {
        rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
        AsyncIO *IO = &RSSAggr->IO;
-       EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
+
+       if (IO->CitContext != NULL)
+               EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
 
        FreeStrBuf(&RSSAggr->Url);
        FreeStrBuf(&RSSAggr->rooms);
@@ -369,10 +393,8 @@ eNextState RSSSaveMessage(AsyncIO *IO)
        CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
 
        /* write the uidl to the use table so we don't store this item again */
-       cdb_store(CDB_USETABLE,
-                 SKEY(RSSAggr->ThisMsg->MsgGUID),
-                 &RSSAggr->ThisMsg->ut,
-                 sizeof(struct UseTable) );
+
+       CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID);
 
        if (GetNextHashPos(RSSAggr->Messages,
                           RSSAggr->Pos,
@@ -387,27 +409,25 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 {
        const char *Key;
        long len;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        /* Find out if we've already seen this item */
-       strcpy(Ctx->ThisMsg->ut.ut_msgid,
-              ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
-       Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
-
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
+// todo: expiry?
 #ifndef DEBUG_RSS
-       if (cdbut != NULL) {
+       SetRSSState(IO, eRSSUT);
+       if (CheckIfAlreadySeen("RSS Item Seen",
+                              Ctx->ThisMsg->MsgGUID,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
                /* Item has already been seen */
                EVRSSC_syslog(LOG_DEBUG,
                          "%s has already been seen\n",
                          ChrPtr(Ctx->ThisMsg->MsgGUID));
-               cdb_free(cdbut);
-
-               /* rewrite the record anyway, to update the timestamp */
-               cdb_store(CDB_USETABLE,
-                         SKEY(Ctx->ThisMsg->MsgGUID),
-                         &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
+               SetRSSState(IO, eRSSParsing);
 
                if (GetNextHashPos(Ctx->Messages,
                                   Ctx->Pos,
@@ -422,18 +442,19 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
        else
 #endif
        {
+               SetRSSState(IO, eRSSParsing);
+
                NextDBOperation(IO, RSSSaveMessage);
                return eSendMore;
        }
+       return eSendMore;
 }
 
 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
 {
-       struct UseTable ut;
        u_char rawdigest[MD5_DIGEST_LEN];
        struct MD5Context md5context;
        StrBuf *guid;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        if (IO->HttpReq.httpcode != 200)
@@ -442,6 +463,7 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
                long lens[2];
                const char *strs[2];
 
+               SetRSSState(IO, eRSSFailure);
                ErrMsg = NewStrBuf();
                EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n",
                              IO->HttpReq.httpcode);
@@ -462,10 +484,18 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
                CtdlAideFPMessage(
                        ChrPtr(ErrMsg),
                        "RSS Aggregation run failure",
-                       2, strs, (long*) &lens);
+                       2, strs, (long*) &lens,
+                       IO->Now,
+                       IO->ID, CCID);
+               
                FreeStrBuf(&ErrMsg);
+               EVRSSC_syslog(LOG_DEBUG,
+                             "RSS feed returned an invalid http status code. <%s><HTTP %ld>\n",
+                             ChrPtr(Ctx->Url),
+                             IO->HttpReq.httpcode);
                return eAbort;
        }
+       SetRSSState(IO, eRSSUT);
 
        MD5Init(&md5context);
 
@@ -483,26 +513,25 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
        if (StrLength(guid) > 40)
                StrBufCutAt(guid, 40, NULL);
        /* Find out if we've already seen this item */
-       memcpy(ut.ut_msgid, SKEY(guid));
-       ut.ut_timestamp = time(NULL);
 
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
 #ifndef DEBUG_RSS
-       if (cdbut != NULL) {
-               /* Item has already been seen */
-               EVRSSC_syslog(LOG_DEBUG,
-                             "%s has already been seen\n",
-                             ChrPtr(Ctx->Url));
-               cdb_free(cdbut);
-       }
 
-       /* rewrite the record anyway, to update the timestamp */
-       cdb_store(CDB_USETABLE,
-                 SKEY(guid),
-                 &ut, sizeof(struct UseTable) );
+       if (CheckIfAlreadySeen("RSS Whole",
+                              guid,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
+               FreeStrBuf(&guid);
+
+               EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url));
+               return eAbort;
+       }
        FreeStrBuf(&guid);
-       if (cdbut != NULL) return eAbort;
 #endif
+       SetRSSState(IO, eRSSParsing);
        return RSSAggregator_ParseReply(IO);
 }
 
@@ -540,6 +569,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr)
                EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
                return 0;
        }
+       SetRSSState(IO, eRSSCreated);
 
        safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
                    ChrPtr(RSSAggr->Url),
@@ -549,6 +579,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr)
        ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
        CurlPrepareURL(RSSAggr->IO.ConnectMe);
 
+       SetRSSState(IO, eRSSFetching);
        QueueCurlContext(&RSSAggr->IO);
        return 1;
 }
@@ -559,7 +590,6 @@ int rss_do_fetching(rss_aggregator *RSSAggr)
 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
 {
        const RoomNetCfgLine *pLine;
-       rss_room_counter *Count = NULL;
        rss_aggregator *RSSAggr = NULL;
        rss_aggregator *use_this_RSSAggr = NULL;
        void *vptr;
@@ -578,24 +608,23 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR
 
        if (server_shutting_down) return;
 
-       pLine = OneRNCFG->NetConfigs[pop3client];
+       pLine = OneRNCFG->NetConfigs[rssclient];
 
        while (pLine != NULL)
        {
-               if (Count == NULL)
-               {
-                       Count = malloc(
-                               sizeof(rss_room_counter));
-                       Count->count = 0;
-               }
-               Count->count ++;
+               const char *lPtr = NULL;
+
                RSSAggr = (rss_aggregator *) malloc(
                        sizeof(rss_aggregator));
 
                memset (RSSAggr, 0, sizeof(rss_aggregator));
                RSSAggr->QRnumber = qrbuf->QRnumber;
                RSSAggr->roomlist_parts = 1;
-               RSSAggr->Url = NewStrBufDup(pLine->Value[1]);
+               RSSAggr->Url = NewStrBufPlain(NULL, StrLength(pLine->Value[0]));
+               StrBufExtract_NextToken(RSSAggr->Url,
+                                       pLine->Value[0],
+                                       &lPtr,
+                                       '|');
 
                pthread_mutex_lock(&RSSQueueMutex);
                GetHash(RSSFetchUrls,
@@ -628,6 +657,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR
                        FreeStrBuf(&RSSAggr->Url);
                        free(RSSAggr);
                        RSSAggr = NULL;
+                       pLine = pLine->next;
                        continue;
                }
                pthread_mutex_unlock(&RSSQueueMutex);
@@ -645,6 +675,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR
                    DeleteRssCfg);
 
                pthread_mutex_unlock(&RSSQueueMutex);
+               pLine = pLine->next;
        }
 }
 
@@ -688,21 +719,29 @@ void rssclient_scan(void) {
        }
 
        become_session(&rss_CC);
-       EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
+       EVRSSQM_syslog(LOG_DEBUG, "rssclient started");
        CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
 
-       pthread_mutex_lock(&RSSQueueMutex);
-
-       it = GetNewHashPos(RSSFetchUrls, 0);
-       while (!server_shutting_down &&
-              GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
-              (vrptr != NULL)) {
-               rptr = (rss_aggregator *)vrptr;
-               if (!rss_do_fetching(rptr))
-                       UnlinkRSSAggregator(rptr);
+       if (GetCount(RSSFetchUrls) > 0)
+       {
+               pthread_mutex_lock(&RSSQueueMutex);
+               EVRSSQ_syslog(LOG_DEBUG,
+                              "rssclient starting %d Clients",
+                              GetCount(RSSFetchUrls));
+               
+               it = GetNewHashPos(RSSFetchUrls, 0);
+               while (!server_shutting_down &&
+                      GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
+                      (vrptr != NULL)) {
+                       rptr = (rss_aggregator *)vrptr;
+                       if (!rss_do_fetching(rptr))
+                               UnlinkRSSAggregator(rptr);
+               }
+               DeleteHashPos(&it);
+               pthread_mutex_unlock(&RSSQueueMutex);
        }
-       DeleteHashPos(&it);
-       pthread_mutex_unlock(&RSSQueueMutex);
+       else
+               EVRSSQM_syslog(LOG_DEBUG, "Nothing to do.");
 
        EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
        return;