SEEN-Database: refactor database interface for remembering whether we already aggrega...
authorWilfried Goesgens <dothebart@citadel.org>
Fri, 29 Mar 2013 10:53:03 +0000 (11:53 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Fri, 29 Mar 2013 10:53:03 +0000 (11:53 +0100)
  - concentrate all application interactions in database.c in CheckIfAlreadySeen
  - add debug log facility for seen access
  - reduce write access to the table by only refreshing records every 4 days (we need to do this, else they will expire after 7 days)

13 files changed:
citadel/database.c
citadel/database.h
citadel/include/ctdl_module.h
citadel/modules/network/serv_network.c
citadel/modules/networkclient/serv_networkclient.c
citadel/modules/pop3client/serv_pop3client.c
citadel/modules/rssclient/rss_atom_parser.h
citadel/modules/rssclient/serv_rssclient.c
citadel/msgbase.c
citadel/msgbase.h
citadel/netconfig.c
citadel/scripts/mk_module_init.sh
citadel/sysconfig.h

index e2654b6e65fb010219e41d8807a1df2842579774..3bab5776d90ebe522eefb34b1c751f631de98ca1 100644 (file)
@@ -906,3 +906,84 @@ void cdb_trunc(int cdb)
                }
        }
 }
+
+int SeentDebugEnabled = 0;
+
+#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (SeentDebugEnabled != 0))
+#define SEENM_syslog(LEVEL, FORMAT)                                    \
+       DBGLOG(LEVEL) syslog(LEVEL,                                     \
+                            "IO[%ld]CC[%ld] SEEN[%s][%d] " FORMAT,     \
+                            ioid, ccid, Facility, cType)
+
+time_t CheckIfAlreadySeen(const char *Facility,
+                         StrBuf *guid,
+                         time_t now,
+                         time_t antiexpire,
+                         eCheckType cType,
+                         long ccid,
+                         long ioid)
+{
+       struct UseTable ut;
+       struct cdbdata *cdbut;
+
+       if (cType != eWrite)
+       {
+               time_t InDBTimeStamp = 0;
+               SEENM_syslog(LOG_DEBUG, "Loading");
+               cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
+               if (cdbut != NULL) {
+                       memcpy(&ut, cdbut->ptr,
+                              ((cdbut->len > sizeof(struct UseTable)) ?
+                               sizeof(struct UseTable) : cdbut->len));
+                       
+                       if (ut.ut_timestamp > antiexpire)
+                       {
+                               SEENM_syslog(LOG_DEBUG, "Found - Not expired.");
+                               cdb_free(cdbut);
+                               return ut.ut_timestamp;
+                       }
+                       else
+                       {
+                               SEENM_syslog(LOG_DEBUG, "Found - Expired.");
+                               InDBTimeStamp = ut.ut_timestamp;
+                               cdb_free(cdbut);
+                       }
+               }
+               else
+               {
+                       SEENM_syslog(LOG_DEBUG, "not Found");
+               }
+
+               if (cType == eCheckExist)
+                       return InDBTimeStamp;
+       }
+
+       memcpy(ut.ut_msgid, SKEY(guid));
+       ut.ut_timestamp = now;
+
+       SEENM_syslog(LOG_DEBUG, "Saving");
+       /* rewrite the record anyway, to update the timestamp */
+       cdb_store(CDB_USETABLE,
+                 SKEY(guid),
+                 &ut, sizeof(struct UseTable) );
+
+       SEENM_syslog(LOG_DEBUG, "Done Saving");
+       return 0;
+}
+
+
+void LogDebugEnableSeenEnable(const int n)
+{
+       SeentDebugEnabled = n;
+}
+
+CTDL_MODULE_INIT(database)
+{
+       if (!threading)
+       {
+               CtdlRegisterDebugFlagHook(HKEY("SeenDebug"), LogDebugEnableSeenEnable, &SeentDebugEnabled);
+       }
+
+       /* return our module id for the log */
+       return "database";
+}
index a9f0bbfd3a1d523a1212ec4e86bc39d2f109db10..88129b49d423e4d09ac725fb4b5d5c87f17877f8 100644 (file)
@@ -51,5 +51,21 @@ struct CtdlCompressHeader {
        size_t compressed_len;
 };
 
+typedef enum __eCheckType {
+       eCheckExist,
+       eCheckUpdate,
+       eUpdate,
+       eWrite
+}eCheckType;
+
+time_t CheckIfAlreadySeen(const char *Facility,
+                         StrBuf *guid,
+                         time_t now,
+                         time_t antiexpire,
+                         eCheckType cType,
+                         long ccid,
+                         long ioid);
+
+
 #endif /* DATABASE_H */
 
index 68577186373401dc521a5fe5d372853c24c537ed..a96cdffe5bea402ac2693267a4e800350d2e1b3a 100644 (file)
@@ -77,7 +77,7 @@
                SUBJECT) 
 
 
-#define CtdlAideFPMessage(TEXT, SUBJECT, N, STR, STRLEN) \
+#define CtdlAideFPMessage(TEXT, SUBJECT, N, STR, STRLEN, ccid, ioid, TIME) \
        flood_protect_quickie_message(                   \
                "Citadel",                               \
                NULL,                                    \
                SUBJECT,                                 \
                N,                                       \
                STR,                                     \
-               STRLEN)
+               STRLEN,                                  \
+               ccid,                                    \
+               ioid,                                    \
+               TIME)
 /*
  * Hook functions available to modules.
  */
index 6c9ed94b95c7625c1656f1b8814ccb8aa02383bf..fb5476cc913f479187a7f73d5f84ff5724f0a619 100644 (file)
@@ -106,10 +106,9 @@ struct RoomProcList *rplist = NULL;
  */
 int network_usetable(struct CtdlMessage *msg)
 {
+       StrBuf *msgid;
        struct CitContext *CCC = CC;
-       char msgid[SIZ];
-       struct cdbdata *cdbut;
-       struct UseTable ut;
+       time_t now;
 
        /* Bail out if we can't generate a message ID */
        if (msg == NULL) {
@@ -123,28 +122,29 @@ int network_usetable(struct CtdlMessage *msg)
        }
 
        /* Generate the message ID */
-       strcpy(msgid, msg->cm_fields['I']);
-       if (haschar(msgid, '@') == 0) {
-               strcat(msgid, "@");
+       msgid = NewStrBufPlain(msg->cm_fields['I'], -1);
+       if (haschar(ChrPtr(msgid), '@') == 0) {
+               StrBufAppendBufPlain(msgid, HKEY("@"), 0);
                if (msg->cm_fields['N'] != NULL) {
-                       strcat(msgid, msg->cm_fields['N']);
+                       StrBufAppendBufPlain(msgid, msg->cm_fields['N'], -1, 0);
                }
                else {
+                       FreeStrBuf(&msgid);
                        return(0);
                }
        }
-
-       cdbut = cdb_fetch(CDB_USETABLE, msgid, strlen(msgid));
-       if (cdbut != NULL) {
-               cdb_free(cdbut);
-               QN_syslog(LOG_DEBUG, "network_usetable() : we already have %s\n", msgid);
+       now = time(NULL);
+       if (CheckIfAlreadySeen("Networker Import",
+                              msgid,
+                              now, 0,
+                              eCheckUpdate,
+                              CCC->cs_pid, 0) != 0)
+       {
+               FreeStrBuf(&msgid);
                return(1);
        }
+       FreeStrBuf(&msgid);
 
-       /* If we got to this point, it's unique: add it. */
-       strcpy(ut.ut_msgid, msgid);
-       ut.ut_timestamp = time(NULL);
-       cdb_store(CDB_USETABLE, msgid, strlen(msgid), &ut, sizeof(struct UseTable) );
        return(0);
 }
 
index d92016620bf5f35ef2f9f329710d1a1dc779bcf8..cc4ac6260a42504ebdbc84e275aa26f13e97213b 100644 (file)
@@ -209,7 +209,9 @@ eNextState SendFailureMessage(AsyncIO *IO)
        CtdlAideFPMessage(
                ChrPtr(NW->IO.ErrMsg),
                "Networker error",
-               2, strs, (long*) &lens);
+               2, strs, (long*) &lens,
+               IO->Now,
+               IO->ID, CCID);
        
        return eAbort;
 }
index 81feca096c9410fc52ac54865480b007e765a2cd..2d4c02530adc6470a0b9ed460abfc766c5ed0b8d 100644 (file)
@@ -373,7 +373,6 @@ eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO)
        long HKLen;
        const char *HKey;
        void *vData;
-       struct cdbdata *cdbut;
        pop3aggr *RecvMsg = (pop3aggr *) IO->Data;
 
        if((RecvMsg->Pos != NULL) &&
@@ -383,32 +382,19 @@ eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO)
                          &HKey,
                          &vData))
        {
-               struct UseTable ut;
                if (server_shutting_down)
                        return eAbort;
 
-               RecvMsg->CurrMsg = (FetchItem*) vData;
-               EVP3CCS_syslog(LOG_DEBUG,
-                              "CHECKING: whether %s has already been seen: ",
-                              ChrPtr(RecvMsg->CurrMsg->MsgUID));
-
-               /* Find out if we've already seen this item */
-               safestrncpy(ut.ut_msgid,
-                           ChrPtr(RecvMsg->CurrMsg->MsgUID),
-                           sizeof(ut.ut_msgid));
-               ut.ut_timestamp = time(NULL);/// TODO: libev timestamp!
-
-               cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID));
-               if (cdbut != NULL) {
+               if (CheckIfAlreadySeen("POP3 Item Seen",
+                                      RecvMsg->CurrMsg->MsgUID,
+                                      IO->Now,
+                                      IO->Now, //// todo
+                                      eCheckUpdate,
+                                      IO->ID, CCID)
+                   != 0)
+               {
                        /* Item has already been seen */
-                       EVP3CCSM_syslog(LOG_DEBUG, "YES\n");
-                       cdb_free(cdbut);
-
-                       /* rewrite the record anyway, to update the timestamp */
-                       cdb_store(CDB_USETABLE,
-                                 SKEY(RecvMsg->CurrMsg->MsgUID),
-                                 &ut, sizeof(struct UseTable) );
-                       RecvMsg->CurrMsg->NeedFetch = 0; ////TODO0;
+                       RecvMsg->CurrMsg->NeedFetch = 0;
                }
                else
                {
@@ -548,21 +534,17 @@ eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg)
 eNextState POP3C_StoreMsgRead(AsyncIO *IO)
 {
        pop3aggr *RecvMsg = (pop3aggr *) IO->Data;
-       struct UseTable ut;
 
        EVP3CCS_syslog(LOG_DEBUG,
                       "MARKING: %s as seen: ",
                       ChrPtr(RecvMsg->CurrMsg->MsgUID));
+       CheckIfAlreadySeen("POP3 Item Seen",
+                          RecvMsg->CurrMsg->MsgUID,
+                          IO->Now,
+                          IO->Now, //// todo
+                          eWrite,
+                          IO->ID, CCID);
 
-       safestrncpy(ut.ut_msgid,
-                   ChrPtr(RecvMsg->CurrMsg->MsgUID),
-                   sizeof(ut.ut_msgid));
-       ut.ut_timestamp = time(NULL); /* TODO: use libev time */
-       cdb_store(CDB_USETABLE,
-                 ChrPtr(RecvMsg->CurrMsg->MsgUID),
-                 StrLength(RecvMsg->CurrMsg->MsgUID),
-                 &ut,
-                 sizeof(struct UseTable) );
        StopDBWatchers(IO);
        return QueueEventContext(&RecvMsg->IO, POP3_C_ReAttachToFetchMessages);
 }
index 43b60f12ec2ccaf426df9ef209dd8680343484cc..3bc4c7826988bb78956e0e47e5453e93d295aadb 100644 (file)
@@ -66,7 +66,6 @@ typedef struct __networker_save_message {
        struct CtdlMessage Msg;
        StrBuf *MsgGUID;
        StrBuf *Message;
-       struct UseTable ut;
 
        StrBuf *author_email;
        StrBuf *author_or_creator;
index 63419534d880aabf85f9b8b842530ec1051330ad..7cc8160fbb09fe172e8f4bbec9224c4560bc1c7f 100644 (file)
@@ -371,10 +371,8 @@ eNextState RSSSaveMessage(AsyncIO *IO)
        CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
 
        /* write the uidl to the use table so we don't store this item again */
-       cdb_store(CDB_USETABLE,
-                 SKEY(RSSAggr->ThisMsg->MsgGUID),
-                 &RSSAggr->ThisMsg->ut,
-                 sizeof(struct UseTable) );
+
+       CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID);
 
        if (GetNextHashPos(RSSAggr->Messages,
                           RSSAggr->Pos,
@@ -389,27 +387,23 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 {
        const char *Key;
        long len;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        /* Find out if we've already seen this item */
-       strcpy(Ctx->ThisMsg->ut.ut_msgid,
-              ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
-       Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
-
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
+// todo: expiry?
 #ifndef DEBUG_RSS
-       if (cdbut != NULL) {
+       if (CheckIfAlreadySeen("RSS Item Seen",
+                              Ctx->ThisMsg->MsgGUID,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
                /* Item has already been seen */
                EVRSSC_syslog(LOG_DEBUG,
                          "%s has already been seen\n",
                          ChrPtr(Ctx->ThisMsg->MsgGUID));
-               cdb_free(cdbut);
-
-               /* rewrite the record anyway, to update the timestamp */
-               cdb_store(CDB_USETABLE,
-                         SKEY(Ctx->ThisMsg->MsgGUID),
-                         &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
 
                if (GetNextHashPos(Ctx->Messages,
                                   Ctx->Pos,
@@ -427,15 +421,14 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
                NextDBOperation(IO, RSSSaveMessage);
                return eSendMore;
        }
+       return eSendMore;
 }
 
 eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
 {
-       struct UseTable ut;
        u_char rawdigest[MD5_DIGEST_LEN];
        struct MD5Context md5context;
        StrBuf *guid;
-       struct cdbdata *cdbut;
        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
        if (IO->HttpReq.httpcode != 200)
@@ -464,7 +457,9 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
                CtdlAideFPMessage(
                        ChrPtr(ErrMsg),
                        "RSS Aggregation run failure",
-                       2, strs, (long*) &lens);
+                       2, strs, (long*) &lens,
+                       IO->Now,
+                       IO->ID, CCID);
                FreeStrBuf(&ErrMsg);
                return eAbort;
        }
@@ -487,32 +482,20 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO)
        /* Find out if we've already seen this item */
 
 #ifndef DEBUG_RSS
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
-       if (cdbut != NULL) {
-                memcpy(&ut, cdbut->ptr,
-                       ((cdbut->len > sizeof(struct UseTable)) ?
-                        sizeof(struct UseTable) : cdbut->len));
-
-               if (IO->Now - ut.ut_timestamp  > 
-                   60 * 60 * 24 * 4)
-               {
-                       /* Item has already been seen in the last 4 days */
-                       EVRSSC_syslog(LOG_DEBUG,
-                                     "%s has already been seen\n",
-                                     ChrPtr(Ctx->Url));
-               }
-               cdb_free(cdbut);
-       }
 
-       memcpy(ut.ut_msgid, SKEY(guid));
-       ut.ut_timestamp = IO->Now;
+       if (CheckIfAlreadySeen("RSS Whole",
+                              guid,
+                              IO->Now,
+                              IO->Now - USETABLE_ANTIEXPIRE,
+                              eCheckUpdate,
+                              IO->ID, CCID)
+           != 0)
+       {
+               FreeStrBuf(&guid);
 
-       /* rewrite the record anyway, to update the timestamp */
-       cdb_store(CDB_USETABLE,
-                 SKEY(guid),
-                 &ut, sizeof(struct UseTable) );
+               return eAbort;
+       }
        FreeStrBuf(&guid);
-       if (cdbut != NULL) return eAbort;
 #endif
        return RSSAggregator_ParseReply(IO);
 }
index 825e8970a9ca9f37d4f88b885377c52be2110701..5c10578c0160e711ab4464789833815bcda8bf91 100644 (file)
@@ -3606,18 +3606,18 @@ void flood_protect_quickie_message(const char *from,
                                   const char *subject,
                                   int nCriterions,
                                   const char **CritStr,
-                                  long *CritStrLen)
+                                  long *CritStrLen,
+                                  long ccid,
+                                  long ioid,
+                                  time_t NOW)
 {
        int i;
-       struct UseTable ut;
        u_char rawdigest[MD5_DIGEST_LEN];
        struct MD5Context md5context;
        StrBuf *guid;
-       struct cdbdata *cdbut;
        char timestamp[64];
        long tslen;
-       time_t ts = time(NULL);
-       time_t tsday = ts / (8*60*60); /* just care for a day... */
+       time_t tsday = NOW / (8*60*60); /* just care for a day... */
 
        tslen = snprintf(timestamp, sizeof(timestamp), "%ld", tsday);
        MD5Init(&md5context);
@@ -3635,27 +3635,22 @@ void flood_protect_quickie_message(const char *from,
        StrBufAppendBufPlain(guid, HKEY("_fldpt"), 0);
        if (StrLength(guid) > 40)
                StrBufCutAt(guid, 40, NULL);
-       /* Find out if we've already sent a similar message */
-       memcpy(ut.ut_msgid, SKEY(guid));
-       ut.ut_timestamp = ts;
 
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
-
-       if (cdbut != NULL) {
+       if (CheckIfAlreadySeen("FPAideMessage",
+                              guid,
+                              NOW,
+                              tsday,
+                              eUpdate,
+                              ccid,
+                              ioid)!= 0)
+       {
+               FreeStrBuf(&guid);
                /* yes, we did. flood protection kicks in. */
                syslog(LOG_DEBUG,
                       "not sending message again\n");
-               cdb_free(cdbut);
+               return;
        }
-
-       /* rewrite the record anyway, to update the timestamp */
-       cdb_store(CDB_USETABLE,
-                 SKEY(guid),
-                 &ut, sizeof(struct UseTable) );
-       
        FreeStrBuf(&guid);
-
-       if (cdbut != NULL) return;
        /* no, this message isn't sent recently; go ahead. */
        quickie_message(from,
                        fromaddr,
index c051f490e31f63ef53494853962a32a2fe7cc84c..e9f8a3f84313682041e2a6696038c5834bf15742 100644 (file)
@@ -129,7 +129,10 @@ void flood_protect_quickie_message(const char *from,
                                   const char *subject,
                                   int nCriterions,
                                   const char **CritStr,
-                                  long *CritStrLen);
+                                  long *CritStrLen,
+                                  long ccid,
+                                  long ioid,
+                                  time_t NOW);
 
 void cmd_ent0 (char *entargs);
 void cmd_dele (char *delstr);
index 1b2e974aa6b42ad137397438dec0c7f620fbf27d..682d4e05acb301b59eb54a3293862e508743cb38 100644 (file)
@@ -926,7 +926,8 @@ void cmd_netp(char *cmdbuf)
                CtdlAideFPMessage(
                        err_buf,
                        "IGNet Networking.",
-                       2, strs, (long*) &lens);
+                       2, strs, (long*) &lens,
+                       time(NULL));
 
                DeleteHash(&working_ignetcfg);
                FreeStrBuf(&NodeStr);
@@ -951,7 +952,9 @@ void cmd_netp(char *cmdbuf)
                CtdlAideFPMessage(
                        err_buf,
                        "IGNet Networking.",
-                       2, strs, (long*) &lens);
+                       2, strs,
+                       (long*) &lens,
+                       time(NULL));
 
                DeleteHash(&working_ignetcfg);
                FreeStrBuf(&NodeStr);
index f78fa80439be0f4d36c8e777bcff074edfdd5f17..cc51ae933916b291a9d829643fb75878c1ca829c 100755 (executable)
@@ -30,7 +30,7 @@ U_FILE="$CUR_DIR/modules_upgrade.c"
 
 /usr/bin/printf "Scanning extension modules for entry points.\n"
 
-STATIC_FIRST_MODULES="citserver control modules euidindex file_ops msgbase room_ops user_ops nttlist"
+STATIC_FIRST_MODULES="citserver control modules euidindex file_ops msgbase room_ops user_ops nttlist database"
 DYNAMIC_MODULES=`grep CTDL_MODULE_INIT modules/*/*.c |$SED 's;.*(\(.*\));\1;'`
 if test -d user_modules; then 
     USER_MODULES=`grep CTDL_MODULE_INIT user_modules/*/*.c |$SED 's;.*(\(.*\));\1;'`
index a2a00f52b238d9b0e2fdf8f3f5f174b4be7e4e8a..3d9a259d61e2f2bd9e245d582829dcf5e36ee62e 100644 (file)
  * How long (in seconds) to retain message entries in the use table
  */
 #define USETABLE_RETAIN                604800L         /* 7 days */
+#define USETABLE_ANTIEXPIRE     345600l         /* 4 days */
 
 /*
  * The size of per-thread stacks.  If set too low, citserver will randomly crash.