From: Wilfried Goesgens Date: Tue, 21 Feb 2012 22:47:25 +0000 (+0100) Subject: Merge branch 'master' of ssh://git.citadel.org/appl/gitroot/citadel X-Git-Tag: v8.11~184 X-Git-Url: https://code.citadel.org/?p=citadel.git;a=commitdiff_plain;h=0c9601dbc42e6eb56a75a0e39bada0199616391a;hp=dcf06d1545728cceba8f1395dd3aaa9f1203d9d3 Merge branch 'master' of ssh://git.citadel.org/appl/gitroot/citadel --- diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c index 485395044..2d406f2e3 100644 --- a/citadel/modules/smtp/serv_smtpqueue.c +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -93,6 +93,9 @@ struct CitContext smtp_queue_CC; pthread_mutex_t ActiveQItemsLock; HashList *ActiveQItems = NULL; HashList *QItemHandlers = NULL; +int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */ +int ndelay_count = 50; /* every n queued messages we will sleep... */ +int delay_msec = 5000; /* this many seconds. */ static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2; int MsgCount = 0; @@ -612,6 +615,7 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt) * Called by smtp_do_queue() to handle an individual message. */ void smtp_do_procmsg(long msgnum, void *userdata) { + int mynumsessions = num_sessions; struct CtdlMessage *msg = NULL; char *instr = NULL; StrBuf *PlainQItem; @@ -626,6 +630,13 @@ void smtp_do_procmsg(long msgnum, void *userdata) { int HaveBuffers = 0; StrBuf *Msg =NULL; + if (mynumsessions > max_sessions_for_outbound_smtp) { + syslog(LOG_DEBUG, + "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp", + mynumsessions, + max_sessions_for_outbound_smtp); + } + syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); ///strcpy(envelope_from, ""); @@ -770,8 +781,32 @@ void smtp_do_procmsg(long msgnum, void *userdata) { DeleteHashPos(&It); MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem); + + /* failsafe against overload: + * will we exceed the limit set? + */ + if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && + /* if yes, did we reach more than half of the quota? */ + ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && + /* if... would we ever fit into half of the quota?? */ + (((MyQItem->ActiveDeliveries * 2) < max_sessions_for_outbound_smtp))) + { + /* abort delivery for another time. */ + syslog(LOG_DEBUG, + "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp", + mynumsessions, + MyQItem->ActiveDeliveries, + max_sessions_for_outbound_smtp); + + FreeQueItem(&MyQItem); + + return; + } + + if (MyQItem->ActiveDeliveries > 0) { + int nActivated = 0; int n = MsgCount++; int m = MyQItem->ActiveDeliveries; int i = 1; @@ -786,6 +821,11 @@ void smtp_do_procmsg(long msgnum, void *userdata) { if (ThisItem->Active == 1) { int KeepBuffers = (i == m); + + nActivated++; + if (nActivated % ndelay_count == 0) + usleep(delay_msec); + if (i > 1) n = MsgCount++; syslog(LOG_DEBUG, "SMTPQ: Trying <%ld> <%s> %d / %d \n", @@ -962,8 +1002,25 @@ void cmd_smtp(char *argbuf) { CTDL_MODULE_INIT(smtp_queu) { + char *pstr; + if (!threading) { + pstr = getenv("CITSERVER_n_session_max"); + if ((pstr != NULL) && (*pstr != '\0')) + max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */ + + pstr = getenv("CITSERVER_smtp_n_delay_count"); + if ((pstr != NULL) && (*pstr != '\0')) + ndelay_count = atol(pstr); /* every n queued messages we will sleep... */ + + pstr = getenv("CITSERVER_smtp_delay"); + if ((pstr != NULL) && (*pstr != '\0')) + delay_msec = atol(pstr) * 1000; /* this many seconds. */ + + + + CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send"); ActiveQItems = NewHash(1, lFlathash); pthread_mutex_init(&ActiveQItemsLock, NULL); diff --git a/citadel/threads.c b/citadel/threads.c index 59aae25d2..a9fce4eb4 100644 --- a/citadel/threads.c +++ b/citadel/threads.c @@ -207,7 +207,7 @@ void go_threading(void) if ((active_workers == num_workers) && (num_workers < 256)) { CtdlThreadCreate(worker_thread); } - sleep(1); + usleep(1000); } /* When we get to this point we are getting ready to shut down our Citadel server */ @@ -221,6 +221,6 @@ void go_threading(void) syslog(LOG_DEBUG, "Waiting %d seconds for %d worker threads to exit", countdown, num_workers ); - sleep(1); + usleep(1000); } }