From eef0aa8dfaf6928394a5465603d5390895a7e743 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Tue, 21 Feb 2012 23:42:09 +0100 Subject: [PATCH] SMTP-Client: implement tactics not to overload our peers - local context overload protection: try not to exceed our session limit (not possible if more mailinglist recipients than contexts) - rate limiting to reduce threat on DNS and Peers --- citadel/modules/smtp/serv_smtpqueue.c | 57 +++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c index 485395044..2d406f2e3 100644 --- a/citadel/modules/smtp/serv_smtpqueue.c +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -93,6 +93,9 @@ struct CitContext smtp_queue_CC; pthread_mutex_t ActiveQItemsLock; HashList *ActiveQItems = NULL; HashList *QItemHandlers = NULL; +int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */ +int ndelay_count = 50; /* every n queued messages we will sleep... */ +int delay_msec = 5000; /* this many seconds. */ static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2; int MsgCount = 0; @@ -612,6 +615,7 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt) * Called by smtp_do_queue() to handle an individual message. */ void smtp_do_procmsg(long msgnum, void *userdata) { + int mynumsessions = num_sessions; struct CtdlMessage *msg = NULL; char *instr = NULL; StrBuf *PlainQItem; @@ -626,6 +630,13 @@ void smtp_do_procmsg(long msgnum, void *userdata) { int HaveBuffers = 0; StrBuf *Msg =NULL; + if (mynumsessions > max_sessions_for_outbound_smtp) { + syslog(LOG_DEBUG, + "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp", + mynumsessions, + max_sessions_for_outbound_smtp); + } + syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); ///strcpy(envelope_from, ""); @@ -770,8 +781,32 @@ void smtp_do_procmsg(long msgnum, void *userdata) { DeleteHashPos(&It); MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem); + + /* failsafe against overload: + * will we exceed the limit set? + */ + if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && + /* if yes, did we reach more than half of the quota? */ + ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && + /* if... would we ever fit into half of the quota?? */ + (((MyQItem->ActiveDeliveries * 2) < max_sessions_for_outbound_smtp))) + { + /* abort delivery for another time. */ + syslog(LOG_DEBUG, + "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp", + mynumsessions, + MyQItem->ActiveDeliveries, + max_sessions_for_outbound_smtp); + + FreeQueItem(&MyQItem); + + return; + } + + if (MyQItem->ActiveDeliveries > 0) { + int nActivated = 0; int n = MsgCount++; int m = MyQItem->ActiveDeliveries; int i = 1; @@ -786,6 +821,11 @@ void smtp_do_procmsg(long msgnum, void *userdata) { if (ThisItem->Active == 1) { int KeepBuffers = (i == m); + + nActivated++; + if (nActivated % ndelay_count == 0) + usleep(delay_msec); + if (i > 1) n = MsgCount++; syslog(LOG_DEBUG, "SMTPQ: Trying <%ld> <%s> %d / %d \n", @@ -962,8 +1002,25 @@ void cmd_smtp(char *argbuf) { CTDL_MODULE_INIT(smtp_queu) { + char *pstr; + if (!threading) { + pstr = getenv("CITSERVER_n_session_max"); + if ((pstr != NULL) && (*pstr != '\0')) + max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */ + + pstr = getenv("CITSERVER_smtp_n_delay_count"); + if ((pstr != NULL) && (*pstr != '\0')) + ndelay_count = atol(pstr); /* every n queued messages we will sleep... */ + + pstr = getenv("CITSERVER_smtp_delay"); + if ((pstr != NULL) && (*pstr != '\0')) + delay_msec = atol(pstr) * 1000; /* this many seconds. */ + + + + CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send"); ActiveQItems = NewHash(1, lFlathash); pthread_mutex_init(&ActiveQItemsLock, NULL); -- 2.30.2