MailQ: remember & display the room for mailingilst Queue-Entries
[citadel.git] / citadel / modules / smtp / serv_smtpqueue.c
index 72e5bc231d60a0b1b47c7dd00239cdb4cebb765e..c4b46142a250eef6e7653c256bf0b4d9117ecd9a 100644 (file)
@@ -93,6 +93,9 @@ struct CitContext smtp_queue_CC;
 pthread_mutex_t ActiveQItemsLock;
 HashList *ActiveQItems  = NULL;
 HashList *QItemHandlers = NULL;
+int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */
+int ndelay_count = 50; /* every n queued messages we will sleep... */
+int delay_msec = 5000; /* this many seconds. */
 
 static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2;
 int MsgCount            = 0;
@@ -167,6 +170,7 @@ void FreeQueItem(OneQueItem **Item)
        DeleteHash(&(*Item)->MailQEntries);
        FreeStrBuf(&(*Item)->EnvelopeFrom);
        FreeStrBuf(&(*Item)->BounceTo);
+       FreeStrBuf(&(*Item)->SenderRoom);
        FreeURL(&(*Item)->URL);
        free(*Item);
        Item = NULL;
@@ -293,6 +297,11 @@ StrBuf *SerializeQueueItem(OneQueItem *MyQItem)
                StrBufAppendBuf(QMessage, MyQItem->EnvelopeFrom, 0);
        }
 
+       if (StrLength(MyQItem->SenderRoom) > 0) {
+               StrBufAppendBufPlain(QMessage, HKEY("\nsource_room|"), 0);
+               StrBufAppendBuf(QMessage, MyQItem->SenderRoom, 0);
+       }
+
        StrBufAppendBufPlain(QMessage, HKEY("\nretry|"), 0);
        StrBufAppendPrintf(QMessage, "%ld",
                           MyQItem->Retry);
@@ -361,6 +370,13 @@ void QItem_Handle_BounceTo(OneQueItem *Item, StrBuf *Line, const char **Pos)
        StrBufExtract_NextToken(Item->BounceTo, Line, Pos, '|');
 }
 
+void QItem_Handle_SenderRoom(OneQueItem *Item, StrBuf *Line, const char **Pos)
+{
+       if (Item->SenderRoom == NULL)
+               Item->SenderRoom = NewStrBufPlain(NULL, StrLength(Line));
+       StrBufExtract_NextToken(Item->SenderRoom, Line, Pos, '|');
+}
+
 void QItem_Handle_Recipient(OneQueItem *Item, StrBuf *Line, const char **Pos)
 {
        if (Item->Current == NULL)
@@ -552,6 +568,17 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt)
        StrBufAppendBuf(BounceMB, Msg, 0);
        FreeStrBuf(&Msg);
 
+       if (StrLength(MyQItem->SenderRoom) > 0)
+       {
+               StrBufAppendBufPlain(
+                       BounceMB,
+                       HKEY("The message was originaly posted in: "), 0);
+               StrBufAppendBuf(BounceMB, MyQItem->SenderRoom, 0);
+               StrBufAppendBufPlain(
+                       BounceMB,
+                       HKEY("\n"), 0);
+       }
+
        /* Attach the original message */
        StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
        StrBufAppendBuf(BounceMB, boundary, 0);
@@ -612,6 +639,7 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt)
  * Called by smtp_do_queue() to handle an individual message.
  */
 void smtp_do_procmsg(long msgnum, void *userdata) {
+       int mynumsessions = num_sessions;
        struct CtdlMessage *msg = NULL;
        char *instr = NULL;
        StrBuf *PlainQItem;
@@ -626,6 +654,13 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        int HaveBuffers = 0;
        StrBuf *Msg =NULL;
 
+       if (mynumsessions > max_sessions_for_outbound_smtp) {
+               syslog(LOG_DEBUG,
+                      "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp",
+                      mynumsessions,
+                      max_sessions_for_outbound_smtp);
+       }
+
        syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
        ///strcpy(envelope_from, "");
 
@@ -770,8 +805,32 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        DeleteHashPos(&It);
 
        MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem);
+
+       /* failsafe against overload: 
+        * will we exceed the limit set? 
+        */
+       if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && 
+           /* if yes, did we reach more than half of the quota? */
+           ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && 
+           /* if... would we ever fit into half of the quota?? */
+           (((MyQItem->ActiveDeliveries * 2)  < max_sessions_for_outbound_smtp)))
+       {
+               /* abort delivery for another time. */
+               syslog(LOG_DEBUG,
+                      "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp",
+                      mynumsessions,
+                      MyQItem->ActiveDeliveries,
+                      max_sessions_for_outbound_smtp);
+
+               FreeQueItem(&MyQItem);
+
+               return;
+       }
+
+
        if (MyQItem->ActiveDeliveries > 0)
        {
+               int nActivated = 0;
                int n = MsgCount++;
                int m = MyQItem->ActiveDeliveries;
                int i = 1;
@@ -786,6 +845,11 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
                        if (ThisItem->Active == 1)
                        {
                                int KeepBuffers = (i == m);
+
+                               nActivated++;
+                               if (nActivated % ndelay_count == 0)
+                                       usleep(delay_msec);
+
                                if (i > 1) n = MsgCount++;
                                syslog(LOG_DEBUG,
                                       "SMTPQ: Trying <%ld> <%s> %d / %d \n",
@@ -904,7 +968,7 @@ void smtp_init_spoolout(void) {
         * Create the room.  This will silently fail if the room already
         * exists, and that's perfectly ok, because we want it to exist.
         */
-       CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_MAILBOX);
+       CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_QUEUE);
 
        /*
         * Make sure it's set to be a "system room" so it doesn't show up
@@ -962,8 +1026,25 @@ void cmd_smtp(char *argbuf) {
 
 CTDL_MODULE_INIT(smtp_queu)
 {
+       char *pstr;
+
        if (!threading)
        {
+               pstr = getenv("CITSERVER_n_session_max");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */
+
+               pstr = getenv("CITSERVER_smtp_n_delay_count");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       ndelay_count = atol(pstr); /* every n queued messages we will sleep... */
+
+               pstr = getenv("CITSERVER_smtp_delay");
+               if ((pstr != NULL) && (*pstr != '\0'))
+                       delay_msec = atol(pstr) * 1000; /* this many seconds. */
+
+
+
+
                CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
                ActiveQItems = NewHash(1, lFlathash);
                pthread_mutex_init(&ActiveQItemsLock, NULL);
@@ -976,8 +1057,8 @@ CTDL_MODULE_INIT(smtp_queu)
                Put(QItemHandlers, HKEY("attempted"), QItem_Handle_Attempted, reference_free_handler);
                Put(QItemHandlers, HKEY("remote"), QItem_Handle_Recipient, reference_free_handler);
                Put(QItemHandlers, HKEY("bounceto"), QItem_Handle_BounceTo, reference_free_handler);
+               Put(QItemHandlers, HKEY("source_room"), QItem_Handle_SenderRoom, reference_free_handler);
                Put(QItemHandlers, HKEY("submitted"), QItem_Handle_Submitted, reference_free_handler);
-
                smtp_init_spoolout();
 
                CtdlRegisterCleanupHook(smtp_evq_cleanup);