SEEN-Database: refactor database interface for remembering whether we already aggrega...
[citadel.git] / citadel / modules / rssclient / serv_rssclient.c
index 29c3efea235e6ce95480198b6037cf20c68ed7cc..7cc8160fbb09fe172e8f4bbec9224c4560bc1c7f 100644 (file)
@@ -164,7 +164,9 @@ void DeleteRssCfg(void *vptr)
 {
        rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
        AsyncIO *IO = &RSSAggr->IO;
-       EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
+
+       if (IO->CitContext != NULL)
+               EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
 
        FreeStrBuf(&RSSAggr->Url);
        FreeStrBuf(&RSSAggr->rooms);
@@ -369,10 +371,8 @@ eNextState RSSSaveMessage(AsyncIO *IO)
        CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
 
        /* write the uidl to the use table so we don't store this item again */
-       cdb_store(CDB_USETABLE,
-                 SKEY(RSSAggr->ThisMsg->MsgGUID),
-                 &RSSAggr->ThisMsg->ut,
-                 sizeof(struct UseTable) );
+
+       CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID);
 
        if (GetNextHashPos(RSSAggr->Messages,
                           RSSAggr->Pos,
@@ -387,27 +387,23 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 {
        const char *Key;
        long len;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        /* Find out if we've already seen this item */
-       strcpy(Ctx->ThisMsg->ut.ut_msgid,
-              ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
-       Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
-
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
+// todo: expiry?
 #ifndef DEBUG_RSS
-       if (cdbut != NULL) {
+       if (CheckIfAlreadySeen("RSS Item Seen",
+                              Ctx->ThisMsg->MsgGUID,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
                /* Item has already been seen */
                EVRSSC_syslog(LOG_DEBUG,
                          "%s has already been seen\n",
                          ChrPtr(Ctx->ThisMsg->MsgGUID));
-               cdb_free(cdbut);
-
-               /* rewrite the record anyway, to update the timestamp */
-               cdb_store(CDB_USETABLE,
-                         SKEY(Ctx->ThisMsg->MsgGUID),
-                         &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
 
                if (GetNextHashPos(Ctx->Messages,
                                   Ctx->Pos,
@@ -425,15 +421,14 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
                NextDBOperation(IO, RSSSaveMessage);
                return eSendMore;
        }
+       return eSendMore;
 }
 
 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
 {
-       struct UseTable ut;
        u_char rawdigest[MD5_DIGEST_LEN];
        struct MD5Context md5context;
        StrBuf *guid;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        if (IO->HttpReq.httpcode != 200)
@@ -462,7 +457,9 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
                CtdlAideFPMessage(
                        ChrPtr(ErrMsg),
                        "RSS Aggregation run failure",
-                       2, strs, (long*) &lens);
+                       2, strs, (long*) &lens,
+                       IO->Now,
+                       IO->ID, CCID);
                FreeStrBuf(&ErrMsg);
                return eAbort;
        }
@@ -483,25 +480,22 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
        if (StrLength(guid) > 40)
                StrBufCutAt(guid, 40, NULL);
        /* Find out if we've already seen this item */
-       memcpy(ut.ut_msgid, SKEY(guid));
-       ut.ut_timestamp = time(NULL);
 
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
 #ifndef DEBUG_RSS
-       if (cdbut != NULL) {
-               /* Item has already been seen */
-               EVRSSC_syslog(LOG_DEBUG,
-                             "%s has already been seen\n",
-                             ChrPtr(Ctx->Url));
-               cdb_free(cdbut);
-       }
 
-       /* rewrite the record anyway, to update the timestamp */
-       cdb_store(CDB_USETABLE,
-                 SKEY(guid),
-                 &ut, sizeof(struct UseTable) );
+       if (CheckIfAlreadySeen("RSS Whole",
+                              guid,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
+               FreeStrBuf(&guid);
+
+               return eAbort;
+       }
        FreeStrBuf(&guid);
-       if (cdbut != NULL) return eAbort;
 #endif
        return RSSAggregator_ParseReply(IO);
 }
@@ -556,21 +550,12 @@ int rss_do_fetching(rss_aggregator *RSSAggr)
 /*
  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
  */
-void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
+void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG)
 {
-       StrBuf *CfgData=NULL;
-       StrBuf *CfgType;
-       StrBuf *Line;
-       rss_room_counter *Count = NULL;
-       struct stat statbuf;
-       char filename[PATH_MAX];
-       int fd;
-       int Done;
+       const RoomNetCfgLine *pLine;
        rss_aggregator *RSSAggr = NULL;
        rss_aggregator *use_this_RSSAggr = NULL;
        void *vptr;
-       const char *CfgPtr, *lPtr;
-       const char *Err;
 
        pthread_mutex_lock(&RSSQueueMutex);
        if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
@@ -584,143 +569,77 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
        }
        pthread_mutex_unlock(&RSSQueueMutex);
 
-       assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
-
-       if (server_shutting_down)
-               return;
+       if (server_shutting_down) return;
 
-       /* Only do net processing for rooms that have netconfigs */
-       fd = open(filename, 0);
-       if (fd <= 0) {
-               /* syslog(LOG_DEBUG,
-                  "rssclient: %s no config.\n",
-                  qrbuf->QRname); */
-               return;
-       }
-
-       if (server_shutting_down)
-               return;
+       pLine = OneRNCFG->NetConfigs[rssclient];
 
-       if (fstat(fd, &statbuf) == -1) {
-               EVRSSQ_syslog(LOG_DEBUG,
-                             "ERROR: could not stat configfile '%s' - %s\n",
-                             filename,
-                             strerror(errno));
-               return;
-       }
+       while (pLine != NULL)
+       {
+               const char *lPtr = NULL;
 
-       if (server_shutting_down)
-               return;
+               RSSAggr = (rss_aggregator *) malloc(
+                       sizeof(rss_aggregator));
 
-       CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
+               memset (RSSAggr, 0, sizeof(rss_aggregator));
+               RSSAggr->QRnumber = qrbuf->QRnumber;
+               RSSAggr->roomlist_parts = 1;
+               RSSAggr->Url = NewStrBufPlain(NULL, StrLength(pLine->Value[0]));
+               StrBufExtract_NextToken(RSSAggr->Url,
+                                       pLine->Value[0],
+                                       &lPtr,
+                                       '|');
 
-       if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
-               close(fd);
-               FreeStrBuf(&CfgData);
-               EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
-                             filename, strerror(errno));
-               return;
-       }
-       close(fd);
-       if (server_shutting_down)
-               return;
+               pthread_mutex_lock(&RSSQueueMutex);
+               GetHash(RSSFetchUrls,
+                       SKEY(RSSAggr->Url),
+                       &vptr);
 
-       CfgPtr = NULL;
-       CfgType = NewStrBuf();
-       Line = NewStrBufPlain(NULL, StrLength(CfgData));
-       Done = 0;
-       while (!Done)
-       {
-               Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
-               if (StrLength(Line) > 0)
+               use_this_RSSAggr = (rss_aggregator *)vptr;
+               if (use_this_RSSAggr != NULL)
                {
-                       lPtr = NULL;
-                       StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
-                       if (!strcasecmp("rssclient", ChrPtr(CfgType)))
+                       long *QRnumber;
+                       StrBufAppendBufPlain(
+                               use_this_RSSAggr->rooms,
+                               qrbuf->QRname,
+                               -1, 0);
+                       if (use_this_RSSAggr->roomlist_parts==1)
                        {
-                               if (Count == NULL)
-                               {
-                                       Count = malloc(
-                                               sizeof(rss_room_counter));
-                                       Count->count = 0;
-                               }
-                               Count->count ++;
-                               RSSAggr = (rss_aggregator *) malloc(
-                                       sizeof(rss_aggregator));
-
-                               memset (RSSAggr, 0, sizeof(rss_aggregator));
-                               RSSAggr->QRnumber = qrbuf->QRnumber;
-                               RSSAggr->roomlist_parts = 1;
-                               RSSAggr->Url = NewStrBuf();
-
-                               StrBufExtract_NextToken(RSSAggr->Url,
-                                                       Line,
-                                                       &lPtr,
-                                                       '|');
-
-                               pthread_mutex_lock(&RSSQueueMutex);
-                               GetHash(RSSFetchUrls,
-                                       SKEY(RSSAggr->Url),
-                                       &vptr);
-
-                               use_this_RSSAggr = (rss_aggregator *)vptr;
-                               if (use_this_RSSAggr != NULL)
-                               {
-                                       long *QRnumber;
-                                       StrBufAppendBufPlain(
-                                               use_this_RSSAggr->rooms,
-                                               qrbuf->QRname,
-                                               -1, 0);
-                                       if (use_this_RSSAggr->roomlist_parts==1)
-                                       {
-                                               use_this_RSSAggr->OtherQRnumbers
-                                                       = NewHash(1, lFlathash);
-                                       }
-                                       QRnumber = (long*)malloc(sizeof(long));
-                                       *QRnumber = qrbuf->QRnumber;
-                                       Put(use_this_RSSAggr->OtherQRnumbers,
-                                           LKEY(qrbuf->QRnumber),
-                                           QRnumber,
-                                           NULL);
-                                       use_this_RSSAggr->roomlist_parts++;
-
-                                       pthread_mutex_unlock(&RSSQueueMutex);
-
-                                       FreeStrBuf(&RSSAggr->Url);
-                                       free(RSSAggr);
-                                       RSSAggr = NULL;
-                                       continue;
-                               }
-                               pthread_mutex_unlock(&RSSQueueMutex);
-
-                               RSSAggr->ItemType = RSS_UNSET;
-
-                               RSSAggr->rooms = NewStrBufPlain(
-                                       qrbuf->QRname, -1);
-
-                               pthread_mutex_lock(&RSSQueueMutex);
-
-                               Put(RSSFetchUrls,
-                                   SKEY(RSSAggr->Url),
-                                   RSSAggr,
-                                   DeleteRssCfg);
-
-                               pthread_mutex_unlock(&RSSQueueMutex);
+                               use_this_RSSAggr->OtherQRnumbers
+                                       = NewHash(1, lFlathash);
                        }
+                       QRnumber = (long*)malloc(sizeof(long));
+                       *QRnumber = qrbuf->QRnumber;
+                       Put(use_this_RSSAggr->OtherQRnumbers,
+                           LKEY(qrbuf->QRnumber),
+                           QRnumber,
+                           NULL);
+                       use_this_RSSAggr->roomlist_parts++;
+
+                       pthread_mutex_unlock(&RSSQueueMutex);
+
+                       FreeStrBuf(&RSSAggr->Url);
+                       free(RSSAggr);
+                       RSSAggr = NULL;
+                       pLine = pLine->next;
+                       continue;
                }
-       }
-       if (Count != NULL)
-       {
-               Count->QRnumber = qrbuf->QRnumber;
+               pthread_mutex_unlock(&RSSQueueMutex);
+
+               RSSAggr->ItemType = RSS_UNSET;
+
+               RSSAggr->rooms = NewStrBufPlain(
+                       qrbuf->QRname, -1);
+
                pthread_mutex_lock(&RSSQueueMutex);
-               EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
-                             qrbuf->QRnumber, qrbuf->QRname);
-               Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
+
+               Put(RSSFetchUrls,
+                   SKEY(RSSAggr->Url),
+                   RSSAggr,
+                   DeleteRssCfg);
+
                pthread_mutex_unlock(&RSSQueueMutex);
+               pLine = pLine->next;
        }
-       FreeStrBuf(&CfgData);
-       FreeStrBuf(&CfgType);
-       FreeStrBuf(&Line);
 }
 
 /*
@@ -764,7 +683,7 @@ void rssclient_scan(void) {
 
        become_session(&rss_CC);
        EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
-       CtdlForEachRoom(rssclient_scan_room, NULL);
+       CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient);
 
        pthread_mutex_lock(&RSSQueueMutex);