SMTP-Client: implement tactics not to overload our peers
authorWilfried Goesgens <dothebart@citadel.org>
Tue, 21 Feb 2012 22:42:09 +0000 (23:42 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Tue, 21 Feb 2012 22:42:09 +0000 (23:42 +0100)
  - local context overload protection: try not to exceed our session limit (not possible if more mailinglist recipients than contexts)
  - rate limiting to reduce threat on DNS and Peers

citadel/modules/smtp/serv_smtpqueue.c

index 4853950445dd41cf3f607b1f8c969c24f01da98d..2d406f2e348f71b8e84c113f56ccb80e15e50eb1 100644 (file)
@@ -93,6 +93,9 @@ struct CitContext smtp_queue_CC;
 pthread_mutex_t ActiveQItemsLock;
 HashList *ActiveQItems  = NULL;
 HashList *QItemHandlers = NULL;
+int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */
+int ndelay_count = 50; /* every n queued messages we will sleep... */
+int delay_msec = 5000; /* this many seconds. */
 
 static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2;
 int MsgCount            = 0;
@@ -612,6 +615,7 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt)
  * Called by smtp_do_queue() to handle an individual message.
  */
 void smtp_do_procmsg(long msgnum, void *userdata) {
+       int mynumsessions = num_sessions;
        struct CtdlMessage *msg = NULL;
        char *instr = NULL;
        StrBuf *PlainQItem;
@@ -626,6 +630,13 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        int HaveBuffers = 0;
        StrBuf *Msg =NULL;
 
+       if (mynumsessions > max_sessions_for_outbound_smtp) {
+               syslog(LOG_DEBUG,
+                      "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp",
+                      mynumsessions,
+                      max_sessions_for_outbound_smtp);
+       }
+
        syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
        ///strcpy(envelope_from, "");
 
@@ -770,8 +781,32 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        DeleteHashPos(&It);
 
        MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem);
+
+       /* failsafe against overload: 
+        * will we exceed the limit set? 
+        */
+       if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && 
+           /* if yes, did we reach more than half of the quota? */
+           ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && 
+           /* if... would we ever fit into half of the quota?? */
+           (((MyQItem->ActiveDeliveries * 2)  < max_sessions_for_outbound_smtp)))
+       {
+               /* abort delivery for another time. */
+               syslog(LOG_DEBUG,
+                      "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp",
+                      mynumsessions,
+                      MyQItem->ActiveDeliveries,
+                      max_sessions_for_outbound_smtp);
+
+               FreeQueItem(&MyQItem);
+
+               return;
+       }
+
+
        if (MyQItem->ActiveDeliveries > 0)
        {
+               int nActivated = 0;
                int n = MsgCount++;
                int m = MyQItem->ActiveDeliveries;
                int i = 1;
@@ -786,6 +821,11 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
                        if (ThisItem->Active == 1)
                        {
                                int KeepBuffers = (i == m);
+
+                               nActivated++;
+                               if (nActivated % ndelay_count == 0)
+                                       usleep(delay_msec);
+
                                if (i > 1) n = MsgCount++;
                                syslog(LOG_DEBUG,
                                       "SMTPQ: Trying <%ld> <%s> %d / %d \n",
@@ -962,8 +1002,25 @@ void cmd_smtp(char *argbuf) {
 
 CTDL_MODULE_INIT(smtp_queu)
 {
+       char *pstr;
+
        if (!threading)
        {
+               pstr = getenv("CITSERVER_n_session_max");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */
+
+               pstr = getenv("CITSERVER_smtp_n_delay_count");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       ndelay_count = atol(pstr); /* every n queued messages we will sleep... */
+
+               pstr = getenv("CITSERVER_smtp_delay");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       delay_msec = atol(pstr) * 1000; /* this many seconds. */
+
+
+
+
                CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
                ActiveQItems = NewHash(1, lFlathash);
                pthread_mutex_init(&ActiveQItemsLock, NULL);