From 66f09fdd7c3f58e13ed01431dd5148302327ca6e Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Fri, 29 Mar 2013 11:53:03 +0100 Subject: [PATCH] SEEN-Database: refactor database interface for remembering whether we already aggregated messages - concentrate all application interactions in database.c in CheckIfAlreadySeen - add debug log facility for seen access - reduce write access to the table by only refreshing records every 4 days (we need to do this, else they will expire after 7 days) --- citadel/database.c | 81 +++++++++++++++++++ citadel/database.h | 16 ++++ citadel/include/ctdl_module.h | 7 +- citadel/modules/network/serv_network.c | 32 ++++---- .../networkclient/serv_networkclient.c | 4 +- citadel/modules/pop3client/serv_pop3client.c | 48 ++++------- citadel/modules/rssclient/rss_atom_parser.h | 1 - citadel/modules/rssclient/serv_rssclient.c | 69 ++++++---------- citadel/msgbase.c | 35 ++++---- citadel/msgbase.h | 5 +- citadel/netconfig.c | 7 +- citadel/scripts/mk_module_init.sh | 2 +- citadel/sysconfig.h | 1 + 13 files changed, 188 insertions(+), 120 deletions(-) diff --git a/citadel/database.c b/citadel/database.c index e2654b6e6..3bab5776d 100644 --- a/citadel/database.c +++ b/citadel/database.c @@ -906,3 +906,84 @@ void cdb_trunc(int cdb) } } } + +int SeentDebugEnabled = 0; + +#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (SeentDebugEnabled != 0)) +#define SEENM_syslog(LEVEL, FORMAT) \ + DBGLOG(LEVEL) syslog(LEVEL, \ + "IO[%ld]CC[%ld] SEEN[%s][%d] " FORMAT, \ + ioid, ccid, Facility, cType) + +time_t CheckIfAlreadySeen(const char *Facility, + StrBuf *guid, + time_t now, + time_t antiexpire, + eCheckType cType, + long ccid, + long ioid) +{ + struct UseTable ut; + struct cdbdata *cdbut; + + if (cType != eWrite) + { + time_t InDBTimeStamp = 0; + SEENM_syslog(LOG_DEBUG, "Loading"); + cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid)); + if (cdbut != NULL) { + memcpy(&ut, cdbut->ptr, + ((cdbut->len > sizeof(struct UseTable)) ? + sizeof(struct UseTable) : cdbut->len)); + + if (ut.ut_timestamp > antiexpire) + { + SEENM_syslog(LOG_DEBUG, "Found - Not expired."); + cdb_free(cdbut); + return ut.ut_timestamp; + } + else + { + SEENM_syslog(LOG_DEBUG, "Found - Expired."); + InDBTimeStamp = ut.ut_timestamp; + cdb_free(cdbut); + } + } + else + { + SEENM_syslog(LOG_DEBUG, "not Found"); + } + + if (cType == eCheckExist) + return InDBTimeStamp; + } + + memcpy(ut.ut_msgid, SKEY(guid)); + ut.ut_timestamp = now; + + SEENM_syslog(LOG_DEBUG, "Saving"); + /* rewrite the record anyway, to update the timestamp */ + cdb_store(CDB_USETABLE, + SKEY(guid), + &ut, sizeof(struct UseTable) ); + + SEENM_syslog(LOG_DEBUG, "Done Saving"); + return 0; +} + + +void LogDebugEnableSeenEnable(const int n) +{ + SeentDebugEnabled = n; +} + +CTDL_MODULE_INIT(database) +{ + if (!threading) + { + CtdlRegisterDebugFlagHook(HKEY("SeenDebug"), LogDebugEnableSeenEnable, &SeentDebugEnabled); + } + + /* return our module id for the log */ + return "database"; +} diff --git a/citadel/database.h b/citadel/database.h index a9f0bbfd3..88129b49d 100644 --- a/citadel/database.h +++ b/citadel/database.h @@ -51,5 +51,21 @@ struct CtdlCompressHeader { size_t compressed_len; }; +typedef enum __eCheckType { + eCheckExist, + eCheckUpdate, + eUpdate, + eWrite +}eCheckType; + +time_t CheckIfAlreadySeen(const char *Facility, + StrBuf *guid, + time_t now, + time_t antiexpire, + eCheckType cType, + long ccid, + long ioid); + + #endif /* DATABASE_H */ diff --git a/citadel/include/ctdl_module.h b/citadel/include/ctdl_module.h index 685771863..a96cdffe5 100644 --- a/citadel/include/ctdl_module.h +++ b/citadel/include/ctdl_module.h @@ -77,7 +77,7 @@ SUBJECT) -#define CtdlAideFPMessage(TEXT, SUBJECT, N, STR, STRLEN) \ +#define CtdlAideFPMessage(TEXT, SUBJECT, N, STR, STRLEN, ccid, ioid, TIME) \ flood_protect_quickie_message( \ "Citadel", \ NULL, \ @@ -88,7 +88,10 @@ SUBJECT, \ N, \ STR, \ - STRLEN) + STRLEN, \ + ccid, \ + ioid, \ + TIME) /* * Hook functions available to modules. */ diff --git a/citadel/modules/network/serv_network.c b/citadel/modules/network/serv_network.c index 6c9ed94b9..fb5476cc9 100644 --- a/citadel/modules/network/serv_network.c +++ b/citadel/modules/network/serv_network.c @@ -106,10 +106,9 @@ struct RoomProcList *rplist = NULL; */ int network_usetable(struct CtdlMessage *msg) { + StrBuf *msgid; struct CitContext *CCC = CC; - char msgid[SIZ]; - struct cdbdata *cdbut; - struct UseTable ut; + time_t now; /* Bail out if we can't generate a message ID */ if (msg == NULL) { @@ -123,28 +122,29 @@ int network_usetable(struct CtdlMessage *msg) } /* Generate the message ID */ - strcpy(msgid, msg->cm_fields['I']); - if (haschar(msgid, '@') == 0) { - strcat(msgid, "@"); + msgid = NewStrBufPlain(msg->cm_fields['I'], -1); + if (haschar(ChrPtr(msgid), '@') == 0) { + StrBufAppendBufPlain(msgid, HKEY("@"), 0); if (msg->cm_fields['N'] != NULL) { - strcat(msgid, msg->cm_fields['N']); + StrBufAppendBufPlain(msgid, msg->cm_fields['N'], -1, 0); } else { + FreeStrBuf(&msgid); return(0); } } - - cdbut = cdb_fetch(CDB_USETABLE, msgid, strlen(msgid)); - if (cdbut != NULL) { - cdb_free(cdbut); - QN_syslog(LOG_DEBUG, "network_usetable() : we already have %s\n", msgid); + now = time(NULL); + if (CheckIfAlreadySeen("Networker Import", + msgid, + now, 0, + eCheckUpdate, + CCC->cs_pid, 0) != 0) + { + FreeStrBuf(&msgid); return(1); } + FreeStrBuf(&msgid); - /* If we got to this point, it's unique: add it. */ - strcpy(ut.ut_msgid, msgid); - ut.ut_timestamp = time(NULL); - cdb_store(CDB_USETABLE, msgid, strlen(msgid), &ut, sizeof(struct UseTable) ); return(0); } diff --git a/citadel/modules/networkclient/serv_networkclient.c b/citadel/modules/networkclient/serv_networkclient.c index d92016620..cc4ac6260 100644 --- a/citadel/modules/networkclient/serv_networkclient.c +++ b/citadel/modules/networkclient/serv_networkclient.c @@ -209,7 +209,9 @@ eNextState SendFailureMessage(AsyncIO *IO) CtdlAideFPMessage( ChrPtr(NW->IO.ErrMsg), "Networker error", - 2, strs, (long*) &lens); + 2, strs, (long*) &lens, + IO->Now, + IO->ID, CCID); return eAbort; } diff --git a/citadel/modules/pop3client/serv_pop3client.c b/citadel/modules/pop3client/serv_pop3client.c index 81feca096..2d4c02530 100644 --- a/citadel/modules/pop3client/serv_pop3client.c +++ b/citadel/modules/pop3client/serv_pop3client.c @@ -373,7 +373,6 @@ eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO) long HKLen; const char *HKey; void *vData; - struct cdbdata *cdbut; pop3aggr *RecvMsg = (pop3aggr *) IO->Data; if((RecvMsg->Pos != NULL) && @@ -383,32 +382,19 @@ eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO) &HKey, &vData)) { - struct UseTable ut; if (server_shutting_down) return eAbort; - RecvMsg->CurrMsg = (FetchItem*) vData; - EVP3CCS_syslog(LOG_DEBUG, - "CHECKING: whether %s has already been seen: ", - ChrPtr(RecvMsg->CurrMsg->MsgUID)); - - /* Find out if we've already seen this item */ - safestrncpy(ut.ut_msgid, - ChrPtr(RecvMsg->CurrMsg->MsgUID), - sizeof(ut.ut_msgid)); - ut.ut_timestamp = time(NULL);/// TODO: libev timestamp! - - cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID)); - if (cdbut != NULL) { + if (CheckIfAlreadySeen("POP3 Item Seen", + RecvMsg->CurrMsg->MsgUID, + IO->Now, + IO->Now, //// todo + eCheckUpdate, + IO->ID, CCID) + != 0) + { /* Item has already been seen */ - EVP3CCSM_syslog(LOG_DEBUG, "YES\n"); - cdb_free(cdbut); - - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(RecvMsg->CurrMsg->MsgUID), - &ut, sizeof(struct UseTable) ); - RecvMsg->CurrMsg->NeedFetch = 0; ////TODO0; + RecvMsg->CurrMsg->NeedFetch = 0; } else { @@ -548,21 +534,17 @@ eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg) eNextState POP3C_StoreMsgRead(AsyncIO *IO) { pop3aggr *RecvMsg = (pop3aggr *) IO->Data; - struct UseTable ut; EVP3CCS_syslog(LOG_DEBUG, "MARKING: %s as seen: ", ChrPtr(RecvMsg->CurrMsg->MsgUID)); + CheckIfAlreadySeen("POP3 Item Seen", + RecvMsg->CurrMsg->MsgUID, + IO->Now, + IO->Now, //// todo + eWrite, + IO->ID, CCID); - safestrncpy(ut.ut_msgid, - ChrPtr(RecvMsg->CurrMsg->MsgUID), - sizeof(ut.ut_msgid)); - ut.ut_timestamp = time(NULL); /* TODO: use libev time */ - cdb_store(CDB_USETABLE, - ChrPtr(RecvMsg->CurrMsg->MsgUID), - StrLength(RecvMsg->CurrMsg->MsgUID), - &ut, - sizeof(struct UseTable) ); StopDBWatchers(IO); return QueueEventContext(&RecvMsg->IO, POP3_C_ReAttachToFetchMessages); } diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index 43b60f12e..3bc4c7826 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -66,7 +66,6 @@ typedef struct __networker_save_message { struct CtdlMessage Msg; StrBuf *MsgGUID; StrBuf *Message; - struct UseTable ut; StrBuf *author_email; StrBuf *author_or_creator; diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 63419534d..7cc8160fb 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -371,10 +371,8 @@ eNextState RSSSaveMessage(AsyncIO *IO) CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, - SKEY(RSSAggr->ThisMsg->MsgGUID), - &RSSAggr->ThisMsg->ut, - sizeof(struct UseTable) ); + + CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID); if (GetNextHashPos(RSSAggr->Messages, RSSAggr->Pos, @@ -389,27 +387,23 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { const char *Key; long len; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, - ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO - Ctx->ThisMsg->ut.ut_timestamp = time(NULL); - - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); +// todo: expiry? #ifndef DEBUG_RSS - if (cdbut != NULL) { + if (CheckIfAlreadySeen("RSS Item Seen", + Ctx->ThisMsg->MsgGUID, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { /* Item has already been seen */ EVRSSC_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); - cdb_free(cdbut); - - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), - &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, @@ -427,15 +421,14 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) NextDBOperation(IO, RSSSaveMessage); return eSendMore; } + return eSendMore; } eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) { - struct UseTable ut; u_char rawdigest[MD5_DIGEST_LEN]; struct MD5Context md5context; StrBuf *guid; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; if (IO->HttpReq.httpcode != 200) @@ -464,7 +457,9 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) CtdlAideFPMessage( ChrPtr(ErrMsg), "RSS Aggregation run failure", - 2, strs, (long*) &lens); + 2, strs, (long*) &lens, + IO->Now, + IO->ID, CCID); FreeStrBuf(&ErrMsg); return eAbort; } @@ -487,32 +482,20 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) /* Find out if we've already seen this item */ #ifndef DEBUG_RSS - cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid)); - if (cdbut != NULL) { - memcpy(&ut, cdbut->ptr, - ((cdbut->len > sizeof(struct UseTable)) ? - sizeof(struct UseTable) : cdbut->len)); - - if (IO->Now - ut.ut_timestamp > - 60 * 60 * 24 * 4) - { - /* Item has already been seen in the last 4 days */ - EVRSSC_syslog(LOG_DEBUG, - "%s has already been seen\n", - ChrPtr(Ctx->Url)); - } - cdb_free(cdbut); - } - memcpy(ut.ut_msgid, SKEY(guid)); - ut.ut_timestamp = IO->Now; + if (CheckIfAlreadySeen("RSS Whole", + guid, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { + FreeStrBuf(&guid); - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(guid), - &ut, sizeof(struct UseTable) ); + return eAbort; + } FreeStrBuf(&guid); - if (cdbut != NULL) return eAbort; #endif return RSSAggregator_ParseReply(IO); } diff --git a/citadel/msgbase.c b/citadel/msgbase.c index 825e8970a..5c10578c0 100644 --- a/citadel/msgbase.c +++ b/citadel/msgbase.c @@ -3606,18 +3606,18 @@ void flood_protect_quickie_message(const char *from, const char *subject, int nCriterions, const char **CritStr, - long *CritStrLen) + long *CritStrLen, + long ccid, + long ioid, + time_t NOW) { int i; - struct UseTable ut; u_char rawdigest[MD5_DIGEST_LEN]; struct MD5Context md5context; StrBuf *guid; - struct cdbdata *cdbut; char timestamp[64]; long tslen; - time_t ts = time(NULL); - time_t tsday = ts / (8*60*60); /* just care for a day... */ + time_t tsday = NOW / (8*60*60); /* just care for a day... */ tslen = snprintf(timestamp, sizeof(timestamp), "%ld", tsday); MD5Init(&md5context); @@ -3635,27 +3635,22 @@ void flood_protect_quickie_message(const char *from, StrBufAppendBufPlain(guid, HKEY("_fldpt"), 0); if (StrLength(guid) > 40) StrBufCutAt(guid, 40, NULL); - /* Find out if we've already sent a similar message */ - memcpy(ut.ut_msgid, SKEY(guid)); - ut.ut_timestamp = ts; - cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid)); - - if (cdbut != NULL) { + if (CheckIfAlreadySeen("FPAideMessage", + guid, + NOW, + tsday, + eUpdate, + ccid, + ioid)!= 0) + { + FreeStrBuf(&guid); /* yes, we did. flood protection kicks in. */ syslog(LOG_DEBUG, "not sending message again\n"); - cdb_free(cdbut); + return; } - - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(guid), - &ut, sizeof(struct UseTable) ); - FreeStrBuf(&guid); - - if (cdbut != NULL) return; /* no, this message isn't sent recently; go ahead. */ quickie_message(from, fromaddr, diff --git a/citadel/msgbase.h b/citadel/msgbase.h index c051f490e..e9f8a3f84 100644 --- a/citadel/msgbase.h +++ b/citadel/msgbase.h @@ -129,7 +129,10 @@ void flood_protect_quickie_message(const char *from, const char *subject, int nCriterions, const char **CritStr, - long *CritStrLen); + long *CritStrLen, + long ccid, + long ioid, + time_t NOW); void cmd_ent0 (char *entargs); void cmd_dele (char *delstr); diff --git a/citadel/netconfig.c b/citadel/netconfig.c index 1b2e974aa..682d4e05a 100644 --- a/citadel/netconfig.c +++ b/citadel/netconfig.c @@ -926,7 +926,8 @@ void cmd_netp(char *cmdbuf) CtdlAideFPMessage( err_buf, "IGNet Networking.", - 2, strs, (long*) &lens); + 2, strs, (long*) &lens, + time(NULL)); DeleteHash(&working_ignetcfg); FreeStrBuf(&NodeStr); @@ -951,7 +952,9 @@ void cmd_netp(char *cmdbuf) CtdlAideFPMessage( err_buf, "IGNet Networking.", - 2, strs, (long*) &lens); + 2, strs, + (long*) &lens, + time(NULL)); DeleteHash(&working_ignetcfg); FreeStrBuf(&NodeStr); diff --git a/citadel/scripts/mk_module_init.sh b/citadel/scripts/mk_module_init.sh index f78fa8043..cc51ae933 100755 --- a/citadel/scripts/mk_module_init.sh +++ b/citadel/scripts/mk_module_init.sh @@ -30,7 +30,7 @@ U_FILE="$CUR_DIR/modules_upgrade.c" /usr/bin/printf "Scanning extension modules for entry points.\n" -STATIC_FIRST_MODULES="citserver control modules euidindex file_ops msgbase room_ops user_ops nttlist" +STATIC_FIRST_MODULES="citserver control modules euidindex file_ops msgbase room_ops user_ops nttlist database" DYNAMIC_MODULES=`grep CTDL_MODULE_INIT modules/*/*.c |$SED 's;.*(\(.*\));\1;'` if test -d user_modules; then USER_MODULES=`grep CTDL_MODULE_INIT user_modules/*/*.c |$SED 's;.*(\(.*\));\1;'` diff --git a/citadel/sysconfig.h b/citadel/sysconfig.h index a2a00f52b..3d9a259d6 100644 --- a/citadel/sysconfig.h +++ b/citadel/sysconfig.h @@ -100,6 +100,7 @@ * How long (in seconds) to retain message entries in the use table */ #define USETABLE_RETAIN 604800L /* 7 days */ +#define USETABLE_ANTIEXPIRE 345600l /* 4 days */ /* * The size of per-thread stacks. If set too low, citserver will randomly crash. -- 2.30.2