MailQ: remember & display the room for mailingilst Queue-Entries
[citadel.git] / citadel / modules / smtp / serv_smtpqueue.c
1 /*
2  * This module is an SMTP and ESMTP implementation for the Citadel system.
3  * It is compliant with all of the following:
4  *
5  * RFC  821 - Simple Mail Transfer Protocol
6  * RFC  876 - Survey of SMTP Implementations
7  * RFC 1047 - Duplicate messages and SMTP
8  * RFC 1652 - 8 bit MIME
9  * RFC 1869 - Extended Simple Mail Transfer Protocol
10  * RFC 1870 - SMTP Service Extension for Message Size Declaration
11  * RFC 2033 - Local Mail Transfer Protocol
12  * RFC 2197 - SMTP Service Extension for Command Pipelining
13  * RFC 2476 - Message Submission
14  * RFC 2487 - SMTP Service Extension for Secure SMTP over TLS
15  * RFC 2554 - SMTP Service Extension for Authentication
16  * RFC 2821 - Simple Mail Transfer Protocol
17  * RFC 2822 - Internet Message Format
18  * RFC 2920 - SMTP Service Extension for Command Pipelining
19  *  
20  * The VRFY and EXPN commands have been removed from this implementation
21  * because nobody uses these commands anymore, except for spammers.
22  *
23  * Copyright (c) 1998-2012 by the citadel.org team
24  *
25  *  This program is open source software; you can redistribute it and/or modify
26  *  it under the terms of the GNU General Public License version 3.
27  *  
28  *  
29  *
30  *  This program is distributed in the hope that it will be useful,
31  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
32  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33  *  GNU General Public License for more details.
34  *
35  *  
36  *  
37  *  
38  */
39
40 #include "sysdep.h"
41 #include <stdlib.h>
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <termios.h>
45 #include <fcntl.h>
46 #include <signal.h>
47 #include <pwd.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <syslog.h>
51
52 #if TIME_WITH_SYS_TIME
53 # include <sys/time.h>
54 # include <time.h>
55 #else
56 # if HAVE_SYS_TIME_H
57 #  include <sys/time.h>
58 # else
59 #  include <time.h>
60 # endif
61 #endif
62 #include <sys/wait.h>
63 #include <ctype.h>
64 #include <string.h>
65 #include <limits.h>
66 #include <sys/socket.h>
67 #include <netinet/in.h>
68 #include <arpa/inet.h>
69 #include <libcitadel.h>
70 #include "citadel.h"
71 #include "server.h"
72 #include "citserver.h"
73 #include "support.h"
74 #include "config.h"
75 #include "control.h"
76 #include "user_ops.h"
77 #include "database.h"
78 #include "msgbase.h"
79 #include "internet_addressing.h"
80 #include "genstamp.h"
81 #include "domain.h"
82 #include "clientsocket.h"
83 #include "locate_host.h"
84 #include "citadel_dirs.h"
85
86 #include "ctdl_module.h"
87
88 #include "smtpqueue.h"
89 #include "event_client.h"
90
91
92 struct CitContext smtp_queue_CC;
93 pthread_mutex_t ActiveQItemsLock;
94 HashList *ActiveQItems  = NULL;
95 HashList *QItemHandlers = NULL;
96 int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */
97 int ndelay_count = 50; /* every n queued messages we will sleep... */
98 int delay_msec = 5000; /* this many seconds. */
99
100 static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2;
101 int MsgCount            = 0;
102 int run_queue_now       = 0;    /* Set to 1 to ignore SMTP send retry times */
103
104 void smtp_try_one_queue_entry(OneQueItem *MyQItem,
105                               MailQEntry *MyQEntry,
106                               StrBuf *MsgText,
107 /* KeepMsgText allows us to use MsgText as ours. */
108                               int KeepMsgText,
109                               int MsgCount,
110                               ParsedURL *RelayUrls);
111
112
113 void smtp_evq_cleanup(void)
114 {
115
116         pthread_mutex_lock(&ActiveQItemsLock);
117         DeleteHash(&QItemHandlers);
118         DeleteHash(&ActiveQItems);
119         pthread_mutex_unlock(&ActiveQItemsLock);
120         pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
121 /*      citthread_mutex_destroy(&ActiveQItemsLock); TODO */
122 }
123
124 int DecreaseQReference(OneQueItem *MyQItem)
125 {
126         int IDestructQueItem;
127
128         pthread_mutex_lock(&ActiveQItemsLock);
129         MyQItem->ActiveDeliveries--;
130         IDestructQueItem = MyQItem->ActiveDeliveries == 0;
131         pthread_mutex_unlock(&ActiveQItemsLock);
132         return IDestructQueItem;
133 }
134
135 void RemoveQItem(OneQueItem *MyQItem)
136 {
137         long len;
138         const char* Key;
139         void *VData;
140         HashPos  *It;
141
142         pthread_mutex_lock(&ActiveQItemsLock);
143         It = GetNewHashPos(ActiveQItems, 0);
144         if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
145                 DeleteEntryFromHash(ActiveQItems, It);
146         else
147         {
148                 syslog(LOG_WARNING,
149                        "SMTP cleanup: unable to find QItem with ID[%ld]",
150                        MyQItem->MessageID);
151                 while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
152                         syslog(LOG_WARNING,
153                                "SMTP cleanup: have_: ID[%ld]",
154                                ((OneQueItem *)VData)->MessageID);
155         }
156         pthread_mutex_unlock(&ActiveQItemsLock);
157         DeleteHashPos(&It);
158 }
159
160
161 void FreeMailQEntry(void *qv)
162 {
163         MailQEntry *Q = qv;
164         FreeStrBuf(&Q->Recipient);
165         FreeStrBuf(&Q->StatusMessage);
166         free(Q);
167 }
168 void FreeQueItem(OneQueItem **Item)
169 {
170         DeleteHash(&(*Item)->MailQEntries);
171         FreeStrBuf(&(*Item)->EnvelopeFrom);
172         FreeStrBuf(&(*Item)->BounceTo);
173         FreeStrBuf(&(*Item)->SenderRoom);
174         FreeURL(&(*Item)->URL);
175         free(*Item);
176         Item = NULL;
177 }
178 void HFreeQueItem(void *Item)
179 {
180         FreeQueItem((OneQueItem**)&Item);
181 }
182
183 /* inspect recipients with a status of:
184  * - 0 (no delivery yet attempted)
185  * - 3/4 (transient errors
186  *        were experienced and it's time to try again)
187  */
188 int CountActiveQueueEntries(OneQueItem *MyQItem)
189 {
190         HashPos  *It;
191         long len;
192         long ActiveDeliveries;
193         const char *Key;
194         void *vQE;
195
196         ActiveDeliveries = 0;
197         It = GetNewHashPos(MyQItem->MailQEntries, 0);
198         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
199         {
200                 MailQEntry *ThisItem = vQE;
201                 if ((ThisItem->Status == 0) ||
202                     (ThisItem->Status == 3) ||
203                     (ThisItem->Status == 4))
204                 {
205                         ActiveDeliveries++;
206                         ThisItem->Active = 1;
207                 }
208                 else
209                         ThisItem->Active = 0;
210         }
211         DeleteHashPos(&It);
212         return ActiveDeliveries;
213 }
214
215 OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID)
216 {
217         OneQueItem *Item;
218         const char *pLine = NULL;
219         StrBuf *Line;
220         StrBuf *Token;
221         void *v;
222
223         Item = (OneQueItem*)malloc(sizeof(OneQueItem));
224         memset(Item, 0, sizeof(OneQueItem));
225         Item->Retry = SMTP_RETRY_INTERVAL;
226         Item->MessageID = -1;
227         Item->QueMsgID = QueMsgID;
228
229         Token = NewStrBuf();
230         Line = NewStrBufPlain(NULL, 128);
231         while (pLine != StrBufNOTNULL) {
232                 const char *pItemPart = NULL;
233                 void *vHandler;
234
235                 StrBufExtract_NextToken(Line, RawQItem, &pLine, '\n');
236                 if (StrLength(Line) == 0) continue;
237                 StrBufExtract_NextToken(Token, Line, &pItemPart, '|');
238                 if (GetHash(QItemHandlers, SKEY(Token), &vHandler))
239                 {
240                         QItemHandler H;
241                         H = (QItemHandler) vHandler;
242                         H(Item, Line, &pItemPart);
243                 }
244         }
245         FreeStrBuf(&Line);
246         FreeStrBuf(&Token);
247
248         if (Item->Retry >= MaxRetry)
249                 Item->FailNow = 1;
250
251         pthread_mutex_lock(&ActiveQItemsLock);
252         if (GetHash(ActiveQItems,
253                     LKEY(Item->MessageID),
254                     &v))
255         {
256                 /* WHOOPS. somebody else is already working on this. */
257                 pthread_mutex_unlock(&ActiveQItemsLock);
258                 FreeQueItem(&Item);
259                 return NULL;
260         }
261         else {
262                 /* mark our claim on this. */
263                 Put(ActiveQItems,
264                     LKEY(Item->MessageID),
265                     Item,
266                     HFreeQueItem);
267                 pthread_mutex_unlock(&ActiveQItemsLock);
268         }
269
270         return Item;
271 }
272
273 StrBuf *SerializeQueueItem(OneQueItem *MyQItem)
274 {
275         StrBuf *QMessage;
276         HashPos  *It;
277         const char *Key;
278         long len;
279         void *vQE;
280
281         QMessage = NewStrBufPlain(NULL, SIZ);
282         StrBufPrintf(QMessage, "Content-type: %s\n", SPOOLMIME);
283 //      "attempted|%ld\n"  "retry|%ld\n",, (long)time(NULL), (long)retry );
284         StrBufAppendBufPlain(QMessage, HKEY("\nmsgid|"), 0);
285         StrBufAppendPrintf(QMessage, "%ld", MyQItem->MessageID);
286
287         StrBufAppendBufPlain(QMessage, HKEY("\nsubmitted|"), 0);
288         StrBufAppendPrintf(QMessage, "%ld", MyQItem->Submitted);
289
290         if (StrLength(MyQItem->BounceTo) > 0) {
291                 StrBufAppendBufPlain(QMessage, HKEY("\nbounceto|"), 0);
292                 StrBufAppendBuf(QMessage, MyQItem->BounceTo, 0);
293         }
294
295         if (StrLength(MyQItem->EnvelopeFrom) > 0) {
296                 StrBufAppendBufPlain(QMessage, HKEY("\nenvelope_from|"), 0);
297                 StrBufAppendBuf(QMessage, MyQItem->EnvelopeFrom, 0);
298         }
299
300         if (StrLength(MyQItem->SenderRoom) > 0) {
301                 StrBufAppendBufPlain(QMessage, HKEY("\nsource_room|"), 0);
302                 StrBufAppendBuf(QMessage, MyQItem->SenderRoom, 0);
303         }
304
305         StrBufAppendBufPlain(QMessage, HKEY("\nretry|"), 0);
306         StrBufAppendPrintf(QMessage, "%ld",
307                            MyQItem->Retry);
308
309         StrBufAppendBufPlain(QMessage, HKEY("\nattempted|"), 0);
310         StrBufAppendPrintf(QMessage, "%ld",
311                            MyQItem->ReattemptWhen);
312
313         It = GetNewHashPos(MyQItem->MailQEntries, 0);
314         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
315         {
316                 MailQEntry *ThisItem = vQE;
317
318                 if (!ThisItem->Active)
319                 {
320                         /* skip already sent ones from the spoolfile. */
321                         continue;
322                 }
323                 StrBufAppendBufPlain(QMessage, HKEY("\nremote|"), 0);
324                 StrBufAppendBuf(QMessage, ThisItem->Recipient, 0);
325                 StrBufAppendBufPlain(QMessage, HKEY("|"), 0);
326                 StrBufAppendPrintf(QMessage, "%d", ThisItem->Status);
327                 StrBufAppendBufPlain(QMessage, HKEY("|"), 0);
328                 StrBufAppendBuf(QMessage, ThisItem->StatusMessage, 0);
329         }
330         DeleteHashPos(&It);
331         StrBufAppendBufPlain(QMessage, HKEY("\n"), 0);
332         return QMessage;
333 }
334
335
336
337
338
339 void NewMailQEntry(OneQueItem *Item)
340 {
341         Item->Current = (MailQEntry*) malloc(sizeof(MailQEntry));
342         memset(Item->Current, 0, sizeof(MailQEntry));
343
344         if (Item->MailQEntries == NULL)
345                 Item->MailQEntries = NewHash(1, Flathash);
346         Item->Current->StatusMessage = NewStrBuf();
347         Item->Current->n = GetCount(Item->MailQEntries);
348         Put(Item->MailQEntries,
349             IKEY(Item->Current->n),
350             Item->Current,
351             FreeMailQEntry);
352 }
353
354 void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos)
355 {
356         Item->MessageID = StrBufExtractNext_long(Line, Pos, '|');
357 }
358
359 void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos)
360 {
361         if (Item->EnvelopeFrom == NULL)
362                 Item->EnvelopeFrom = NewStrBufPlain(NULL, StrLength(Line));
363         StrBufExtract_NextToken(Item->EnvelopeFrom, Line, Pos, '|');
364 }
365
366 void QItem_Handle_BounceTo(OneQueItem *Item, StrBuf *Line, const char **Pos)
367 {
368         if (Item->BounceTo == NULL)
369                 Item->BounceTo = NewStrBufPlain(NULL, StrLength(Line));
370         StrBufExtract_NextToken(Item->BounceTo, Line, Pos, '|');
371 }
372
373 void QItem_Handle_SenderRoom(OneQueItem *Item, StrBuf *Line, const char **Pos)
374 {
375         if (Item->SenderRoom == NULL)
376                 Item->SenderRoom = NewStrBufPlain(NULL, StrLength(Line));
377         StrBufExtract_NextToken(Item->SenderRoom, Line, Pos, '|');
378 }
379
380 void QItem_Handle_Recipient(OneQueItem *Item, StrBuf *Line, const char **Pos)
381 {
382         if (Item->Current == NULL)
383                 NewMailQEntry(Item);
384         if (Item->Current->Recipient == NULL)
385                 Item->Current->Recipient=NewStrBufPlain(NULL, StrLength(Line));
386         StrBufExtract_NextToken(Item->Current->Recipient, Line, Pos, '|');
387         Item->Current->Status = StrBufExtractNext_int(Line, Pos, '|');
388         StrBufExtract_NextToken(Item->Current->StatusMessage, Line, Pos, '|');
389         Item->Current = NULL; // TODO: is this always right?
390 }
391
392
393 void QItem_Handle_retry(OneQueItem *Item, StrBuf *Line, const char **Pos)
394 {
395         Item->Retry =
396                 StrBufExtractNext_int(Line, Pos, '|');
397         Item->Retry *= 2;
398 }
399
400
401 void QItem_Handle_Submitted(OneQueItem *Item, StrBuf *Line, const char **Pos)
402 {
403         Item->Submitted = atol(*Pos);
404
405 }
406
407 void QItem_Handle_Attempted(OneQueItem *Item, StrBuf *Line, const char **Pos)
408 {
409         Item->ReattemptWhen = StrBufExtractNext_int(Line, Pos, '|');
410 }
411
412
413
414 /**
415  * this one has to have the context for loading the message via the redirect buffer...
416  */
417 StrBuf *smtp_load_msg(OneQueItem *MyQItem, int n)
418 {
419         CitContext *CCC=CC;
420         StrBuf *SendMsg;
421
422         CCC->redirect_buffer = NewStrBufPlain(NULL, SIZ);
423         CtdlOutputMsg(MyQItem->MessageID,
424                       MT_RFC822, HEADERS_ALL,
425                       0, 1, NULL,
426                       (ESC_DOT|SUPPRESS_ENV_TO) );
427
428         SendMsg = CCC->redirect_buffer;
429         CCC->redirect_buffer = NULL;
430         if ((StrLength(SendMsg) > 0) &&
431             ChrPtr(SendMsg)[StrLength(SendMsg) - 1] != '\n') {
432                 syslog(LOG_WARNING,
433                        "SMTP client[%d]: Possible problem: message did not "
434                        "correctly terminate. (expecting 0x10, got 0x%02x)\n",
435                        MsgCount, //yes uncool, but best choice here...
436                        ChrPtr(SendMsg)[StrLength(SendMsg) - 1] );
437                 StrBufAppendBufPlain(SendMsg, HKEY("\r\n"), 0);
438         }
439         return SendMsg;
440 }
441
442
443
444 /*
445  * smtp_do_bounce() is caled by smtp_do_procmsg() to scan a set of delivery
446  * instructions for "5" codes (permanent fatal errors) and produce/deliver
447  * a "bounce" message (delivery status notification).
448  */
449 void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt)
450 {
451         static int seq = 0;
452
453         struct CtdlMessage *bmsg = NULL;
454         StrBuf *boundary;
455         StrBuf *Msg = NULL;
456         StrBuf *BounceMB;
457         struct recptypes *valid;
458
459         HashPos *It;
460         void *vQE;
461         long len;
462         const char *Key;
463
464         int successful_bounce = 0;
465         int num_bounces = 0;
466         int give_up = 0;
467
468         syslog(LOG_DEBUG, "smtp_do_bounce() called\n");
469
470         if ( (ev_time() - MyQItem->Submitted) > SMTP_GIVE_UP ) {
471                 give_up = 1;/// TODO: replace time by libevq timer get
472         }
473
474         /*
475          * Now go through the instructions checking for stuff.
476          */
477         It = GetNewHashPos(MyQItem->MailQEntries, 0);
478         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
479         {
480                 MailQEntry *ThisItem = vQE;
481                 if ((ThisItem->Status == 5) || /* failed now? */
482                     ((give_up == 1) &&
483                      (ThisItem->Status != 2)))
484                         /* giving up after failed attempts... */
485                 {
486                         if (num_bounces == 0)
487                                 Msg = NewStrBufPlain(NULL, 1024);
488                         ++num_bounces;
489
490                         StrBufAppendBuf(Msg, ThisItem->Recipient, 0);
491                         StrBufAppendBufPlain(Msg, HKEY(": "), 0);
492                         StrBufAppendBuf(Msg, ThisItem->StatusMessage, 0);
493                         StrBufAppendBufPlain(Msg, HKEY("\r\n"), 0);
494                 }
495         }
496         DeleteHashPos(&It);
497
498         /* Deliver the bounce if there's anything worth mentioning */
499         syslog(LOG_DEBUG, "num_bounces = %d\n", num_bounces);
500
501         if (num_bounces == 0) {
502                 FreeStrBuf(&Msg);
503                 return;
504         }
505
506         boundary = NewStrBufPlain(HKEY("=_Citadel_Multipart_"));
507         StrBufAppendPrintf(boundary,
508                            "%s_%04x%04x",
509                            config.c_fqdn,
510                            getpid(),
511                            ++seq);
512
513         /* Start building our bounce message; go shopping for memory first. */
514         BounceMB = NewStrBufPlain(
515                 NULL,
516                 1024 + /* mime stuff.... */
517                 StrLength(Msg) +  /* the bounce information... */
518                 StrLength(OMsgTxt)); /* the original message */
519         if (BounceMB == NULL) {
520                 FreeStrBuf(&boundary);
521                 syslog(LOG_ERR, "Failed to alloc() bounce message.\n");
522
523                 return;
524         }
525
526         bmsg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
527         if (bmsg == NULL) {
528                 FreeStrBuf(&boundary);
529                 FreeStrBuf(&BounceMB);
530                 syslog(LOG_ERR, "Failed to alloc() bounce message.\n");
531
532                 return;
533         }
534         memset(bmsg, 0, sizeof(struct CtdlMessage));
535
536
537         StrBufAppendBufPlain(BounceMB, HKEY("Content-type: multipart/mixed; boundary=\""), 0);
538         StrBufAppendBuf(BounceMB, boundary, 0);
539         StrBufAppendBufPlain(BounceMB, HKEY("\"\r\n"), 0);
540         StrBufAppendBufPlain(BounceMB, HKEY("MIME-Version: 1.0\r\n"), 0);
541         StrBufAppendBufPlain(BounceMB, HKEY("X-Mailer: " CITADEL "\r\n"), 0);
542         StrBufAppendBufPlain(BounceMB, HKEY("\r\nThis is a multipart message in MIME format.\r\n\r\n"), 0);
543         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
544         StrBufAppendBuf(BounceMB, boundary, 0);
545         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
546         StrBufAppendBufPlain(BounceMB, HKEY("Content-type: text/plain\r\n\r\n"), 0);
547
548         if (give_up)
549                 StrBufAppendBufPlain(
550                         BounceMB,
551                         HKEY(
552                                 "A message you sent could not be delivered "
553                                 "to some or all of its recipients\n"
554                                 "due to prolonged unavailability "
555                                 "of its destination(s).\n"
556                                 "Giving up on the following addresses:\n\n"
557                                 ), 0);
558         else
559                 StrBufAppendBufPlain(
560                         BounceMB,
561                         HKEY(
562                                 "A message you sent could not be delivered "
563                                 "to some or all of its recipients.\n"
564                                 "The following addresses "
565                                 "were undeliverable:\n\n"
566                                 ), 0);
567
568         StrBufAppendBuf(BounceMB, Msg, 0);
569         FreeStrBuf(&Msg);
570
571         if (StrLength(MyQItem->SenderRoom) > 0)
572         {
573                 StrBufAppendBufPlain(
574                         BounceMB,
575                         HKEY("The message was originaly posted in: "), 0);
576                 StrBufAppendBuf(BounceMB, MyQItem->SenderRoom, 0);
577                 StrBufAppendBufPlain(
578                         BounceMB,
579                         HKEY("\n"), 0);
580         }
581
582         /* Attach the original message */
583         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
584         StrBufAppendBuf(BounceMB, boundary, 0);
585         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
586         StrBufAppendBufPlain(BounceMB,
587                              HKEY("Content-type: message/rfc822\r\n"), 0);
588         StrBufAppendBufPlain(BounceMB,
589                              HKEY("Content-Transfer-Encoding: 7bit\r\n"), 0);
590         StrBufAppendBufPlain(BounceMB,
591                              HKEY("Content-Disposition: inline\r\n"), 0);
592         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
593         StrBufAppendBuf(BounceMB, OMsgTxt, 0);
594
595         /* Close the multipart MIME scope */
596         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
597         StrBufAppendBuf(BounceMB, boundary, 0);
598         StrBufAppendBufPlain(BounceMB, HKEY("--\r\n"), 0);
599
600         bmsg->cm_magic = CTDLMESSAGE_MAGIC;
601         bmsg->cm_anon_type = MES_NORMAL;
602         bmsg->cm_format_type = FMT_RFC822;
603
604         bmsg->cm_fields['O'] = strdup(MAILROOM);
605         bmsg->cm_fields['A'] = strdup("Citadel");
606         bmsg->cm_fields['N'] = strdup(config.c_nodename);
607         bmsg->cm_fields['U'] = strdup("Delivery Status Notification (Failure)");
608         bmsg->cm_fields['M'] = SmashStrBuf(&BounceMB);
609
610         /* First try the user who sent the message */
611         if (StrLength(MyQItem->BounceTo) == 0)
612                 syslog(LOG_ERR, "No bounce address specified\n");
613         else
614                 syslog(LOG_DEBUG, "bounce to user? <%s>\n",
615                        ChrPtr(MyQItem->BounceTo));
616
617         /* Can we deliver the bounce to the original sender? */
618         valid = validate_recipients(ChrPtr(MyQItem->BounceTo), NULL, 0);
619         if ((valid != NULL) && (valid->num_error == 0)) {
620                 CtdlSubmitMsg(bmsg, valid, "", QP_EADDR);
621                 successful_bounce = 1;
622         }
623
624         /* If not, post it in the Aide> room */
625         if (successful_bounce == 0) {
626                 CtdlSubmitMsg(bmsg, NULL, config.c_aideroom, QP_EADDR);
627         }
628
629         /* Free up the memory we used */
630         free_recipients(valid);
631         FreeStrBuf(&boundary);
632         CtdlFreeMessage(bmsg);
633         syslog(LOG_DEBUG, "Done processing bounces\n");
634 }
635
636 /*
637  * smtp_do_procmsg()
638  *
639  * Called by smtp_do_queue() to handle an individual message.
640  */
641 void smtp_do_procmsg(long msgnum, void *userdata) {
642         int mynumsessions = num_sessions;
643         struct CtdlMessage *msg = NULL;
644         char *instr = NULL;
645         StrBuf *PlainQItem;
646         OneQueItem *MyQItem;
647         char *pch;
648         HashPos  *It;
649         void *vQE;
650         long len;
651         const char *Key;
652         int nRelays = 0;
653         ParsedURL *RelayUrls = NULL;
654         int HaveBuffers = 0;
655         StrBuf *Msg =NULL;
656
657         if (mynumsessions > max_sessions_for_outbound_smtp) {
658                 syslog(LOG_DEBUG,
659                        "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp",
660                        mynumsessions,
661                        max_sessions_for_outbound_smtp);
662         }
663
664         syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
665         ///strcpy(envelope_from, "");
666
667         msg = CtdlFetchMessage(msgnum, 1);
668         if (msg == NULL) {
669                 syslog(LOG_ERR, "SMTP Queue: tried %ld but no such message!\n",
670                        msgnum);
671                 return;
672         }
673
674         pch = instr = msg->cm_fields['M'];
675
676         /* Strip out the headers (no not amd any other non-instruction) line */
677         while (pch != NULL) {
678                 pch = strchr(pch, '\n');
679                 if ((pch != NULL) && (*(pch + 1) == '\n')) {
680                         instr = pch + 2;
681                         pch = NULL;
682                 }
683         }
684         PlainQItem = NewStrBufPlain(instr, -1);
685         CtdlFreeMessage(msg);
686         MyQItem = DeserializeQueueItem(PlainQItem, msgnum);
687         FreeStrBuf(&PlainQItem);
688
689         if (MyQItem == NULL) {
690                 syslog(LOG_ERR,
691                        "SMTP Queue: Msg No %ld: already in progress!\n",
692                        msgnum);
693                 return; /* s.b. else is already processing... */
694         }
695
696         /*
697          * Postpone delivery if we've already tried recently.
698          */
699         if ((MyQItem->ReattemptWhen != 0) && 
700             (time(NULL) < MyQItem->ReattemptWhen) &&
701             (run_queue_now == 0))
702         {
703                 syslog(LOG_DEBUG, "SMTP client: Retry time not yet reached.\n");
704
705                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
706                 pthread_mutex_lock(&ActiveQItemsLock);
707                 {
708                         if (GetHashPosFromKey(ActiveQItems,
709                                               LKEY(MyQItem->MessageID),
710                                               It))
711                         {
712                                 DeleteEntryFromHash(ActiveQItems, It);
713                         }
714                 }
715                 pthread_mutex_unlock(&ActiveQItemsLock);
716                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
717                 DeleteHashPos(&It);
718                 return;
719         }
720
721         /*
722          * Bail out if there's no actual message associated with this
723          */
724         if (MyQItem->MessageID < 0L) {
725                 syslog(LOG_ERR, "SMTP Queue: no 'msgid' directive found!\n");
726                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
727                 pthread_mutex_lock(&ActiveQItemsLock);
728                 {
729                         if (GetHashPosFromKey(ActiveQItems,
730                                               LKEY(MyQItem->MessageID),
731                                               It))
732                         {
733                                 DeleteEntryFromHash(ActiveQItems, It);
734                         }
735                 }
736                 pthread_mutex_unlock(&ActiveQItemsLock);
737                 DeleteHashPos(&It);
738                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
739                 return;
740         }
741
742         {
743                 char mxbuf[SIZ];
744                 ParsedURL **Url = &MyQItem->URL;
745                 nRelays = get_hosts(mxbuf, "smarthost");
746                 if (nRelays > 0) {
747                         StrBuf *All;
748                         StrBuf *One;
749                         const char *Pos = NULL;
750                         All = NewStrBufPlain(mxbuf, -1);
751                         One = NewStrBufPlain(NULL, StrLength(All) + 1);
752
753                         while ((Pos != StrBufNOTNULL) &&
754                                ((Pos == NULL) ||
755                                 !IsEmptyStr(Pos)))
756                         {
757                                 StrBufExtract_NextToken(One, All, &Pos, '|');
758                                 if (!ParseURL(Url, One, 25))
759                                         syslog(LOG_DEBUG,
760                                                "Failed to parse: %s\n",
761                                                ChrPtr(One));
762                                 else {
763                                         ///if (!Url->IsIP)) // todo dupe me fork ipv6
764                                         Url = &(*Url)->Next;
765                                 }
766                         }
767                         FreeStrBuf(&All);
768                         FreeStrBuf(&One);
769                 }
770
771                 Url = &MyQItem->FallBackHost;
772                 nRelays = get_hosts(mxbuf, "fallbackhost");
773                 if (nRelays > 0) {
774                         StrBuf *All;
775                         StrBuf *One;
776                         const char *Pos = NULL;
777                         All = NewStrBufPlain(mxbuf, -1);
778                         One = NewStrBufPlain(NULL, StrLength(All) + 1);
779
780                         while ((Pos != StrBufNOTNULL) &&
781                                ((Pos == NULL) ||
782                                 !IsEmptyStr(Pos)))
783                         {
784                                 StrBufExtract_NextToken(One, All, &Pos, '|');
785                                 if (!ParseURL(Url, One, 25))
786                                         syslog(LOG_DEBUG,
787                                                "Failed to parse: %s\n",
788                                                ChrPtr(One));
789                                 else
790                                         Url = &(*Url)->Next;
791                         }
792                         FreeStrBuf(&All);
793                         FreeStrBuf(&One);
794                 }
795         }
796
797         It = GetNewHashPos(MyQItem->MailQEntries, 0);
798         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
799         {
800                 MailQEntry *ThisItem = vQE;
801                 syslog(LOG_DEBUG, "SMTP Queue: Task: <%s> %d\n",
802                        ChrPtr(ThisItem->Recipient),
803                        ThisItem->Active);
804         }
805         DeleteHashPos(&It);
806
807         MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem);
808
809         /* failsafe against overload: 
810          * will we exceed the limit set? 
811          */
812         if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && 
813             /* if yes, did we reach more than half of the quota? */
814             ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && 
815             /* if... would we ever fit into half of the quota?? */
816             (((MyQItem->ActiveDeliveries * 2)  < max_sessions_for_outbound_smtp)))
817         {
818                 /* abort delivery for another time. */
819                 syslog(LOG_DEBUG,
820                        "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp",
821                        mynumsessions,
822                        MyQItem->ActiveDeliveries,
823                        max_sessions_for_outbound_smtp);
824
825                 FreeQueItem(&MyQItem);
826
827                 return;
828         }
829
830
831         if (MyQItem->ActiveDeliveries > 0)
832         {
833                 int nActivated = 0;
834                 int n = MsgCount++;
835                 int m = MyQItem->ActiveDeliveries;
836                 int i = 1;
837                 Msg = smtp_load_msg(MyQItem, n);
838                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
839                 while ((i <= m) &&
840                        (GetNextHashPos(MyQItem->MailQEntries,
841                                        It, &len, &Key, &vQE)))
842                 {
843                         MailQEntry *ThisItem = vQE;
844
845                         if (ThisItem->Active == 1)
846                         {
847                                 int KeepBuffers = (i == m);
848
849                                 nActivated++;
850                                 if (nActivated % ndelay_count == 0)
851                                         usleep(delay_msec);
852
853                                 if (i > 1) n = MsgCount++;
854                                 syslog(LOG_DEBUG,
855                                        "SMTPQ: Trying <%ld> <%s> %d / %d \n",
856                                        MyQItem->MessageID,
857                                        ChrPtr(ThisItem->Recipient),
858                                        i,
859                                        m);
860                                 smtp_try_one_queue_entry(MyQItem,
861                                                          ThisItem,
862                                                          Msg,
863                                                          KeepBuffers,
864                                                          n,
865                                                          RelayUrls);
866
867                                 if (KeepBuffers) HaveBuffers = 1;
868
869                                 i++;
870                         }
871                 }
872                 DeleteHashPos(&It);
873         }
874         else
875         {
876                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
877                 pthread_mutex_lock(&ActiveQItemsLock);
878                 {
879                         if (GetHashPosFromKey(ActiveQItems,
880                                               LKEY(MyQItem->MessageID),
881                                               It))
882                         {
883                                 DeleteEntryFromHash(ActiveQItems, It);
884                         }
885                         else
886                         {
887                                 long len;
888                                 const char* Key;
889                                 void *VData;
890
891                                 syslog(LOG_WARNING,
892                                        "SMTP cleanup: unable to find "
893                                        "QItem with ID[%ld]",
894                                        MyQItem->MessageID);
895                                 while (GetNextHashPos(ActiveQItems,
896                                                       It,
897                                                       &len,
898                                                       &Key,
899                                                       &VData))
900                                 {
901                                         syslog(LOG_WARNING,
902                                                "SMTP cleanup: have: ID[%ld]",
903                                               ((OneQueItem *)VData)->MessageID);
904                                 }
905                         }
906
907                 }
908                 pthread_mutex_unlock(&ActiveQItemsLock);
909                 DeleteHashPos(&It);
910                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
911
912 // TODO: bounce & delete?
913
914         }
915         if (!HaveBuffers) {
916                 FreeStrBuf (&Msg);
917 // TODO : free RelayUrls
918         }
919 }
920
921
922
923 /*
924  * smtp_queue_thread()
925  *
926  * Run through the queue sending out messages.
927  */
928 void smtp_do_queue(void) {
929         static int is_running = 0;
930         int num_processed = 0;
931
932         if (is_running)
933                 return; /* Concurrency check - only one can run */
934         is_running = 1;
935
936         pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
937         syslog(LOG_INFO, "SMTP client: processing outbound queue");
938
939         if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
940                 syslog(LOG_ERR, "Cannot find room <%s>", SMTP_SPOOLOUT_ROOM);
941         }
942         else {
943                 num_processed = CtdlForEachMessage(MSGS_ALL,
944                                                    0L,
945                                                    NULL,
946                                                    SPOOLMIME,
947                                                    NULL,
948                                                    smtp_do_procmsg,
949                                                    NULL);
950         }
951         syslog(LOG_INFO,
952                "SMTP client: queue run completed; %d messages processed",
953                num_processed);
954
955         run_queue_now = 0;
956         is_running = 0;
957 }
958
959
960
961 /*
962  * Initialize the SMTP outbound queue
963  */
964 void smtp_init_spoolout(void) {
965         struct ctdlroom qrbuf;
966
967         /*
968          * Create the room.  This will silently fail if the room already
969          * exists, and that's perfectly ok, because we want it to exist.
970          */
971         CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_QUEUE);
972
973         /*
974          * Make sure it's set to be a "system room" so it doesn't show up
975          * in the <K>nown rooms list for Aides.
976          */
977         if (CtdlGetRoomLock(&qrbuf, SMTP_SPOOLOUT_ROOM) == 0) {
978                 qrbuf.QRflags2 |= QR2_SYSTEM;
979                 CtdlPutRoomLock(&qrbuf);
980         }
981 }
982
983
984
985
986 /*****************************************************************************/
987 /*                          SMTP UTILITY COMMANDS                            */
988 /*****************************************************************************/
989
990 void cmd_smtp(char *argbuf) {
991         char cmd[64];
992         char node[256];
993         char buf[1024];
994         int i;
995         int num_mxhosts;
996
997         if (CtdlAccessCheck(ac_aide)) return;
998
999         extract_token(cmd, argbuf, 0, '|', sizeof cmd);
1000
1001         if (!strcasecmp(cmd, "mx")) {
1002                 extract_token(node, argbuf, 1, '|', sizeof node);
1003                 num_mxhosts = getmx(buf, node);
1004                 cprintf("%d %d MX hosts listed for %s\n",
1005                         LISTING_FOLLOWS, num_mxhosts, node);
1006                 for (i=0; i<num_mxhosts; ++i) {
1007                         extract_token(node, buf, i, '|', sizeof node);
1008                         cprintf("%s\n", node);
1009                 }
1010                 cprintf("000\n");
1011                 return;
1012         }
1013
1014         else if (!strcasecmp(cmd, "runqueue")) {
1015                 run_queue_now = 1;
1016                 cprintf("%d All outbound SMTP will be retried now.\n", CIT_OK);
1017                 return;
1018         }
1019
1020         else {
1021                 cprintf("%d Invalid command.\n", ERROR + ILLEGAL_VALUE);
1022         }
1023
1024 }
1025
1026
1027 CTDL_MODULE_INIT(smtp_queu)
1028 {
1029         char *pstr;
1030
1031         if (!threading)
1032         {
1033                 pstr = getenv("CITSERVER_n_session_max");
1034                 if ((pstr != NULL) && (*pstr != '\0'))
1035                         max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */
1036
1037                 pstr = getenv("CITSERVER_smtp_n_delay_count");
1038                 if ((pstr != NULL) && (*pstr != '\0'))
1039                         ndelay_count = atol(pstr); /* every n queued messages we will sleep... */
1040
1041                 pstr = getenv("CITSERVER_smtp_delay");
1042                 if ((pstr != NULL) && (*pstr != '\0'))
1043                         delay_msec = atol(pstr) * 1000; /* this many seconds. */
1044
1045
1046
1047
1048                 CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
1049                 ActiveQItems = NewHash(1, lFlathash);
1050                 pthread_mutex_init(&ActiveQItemsLock, NULL);
1051
1052                 QItemHandlers = NewHash(0, NULL);
1053
1054                 Put(QItemHandlers, HKEY("msgid"), QItem_Handle_MsgID, reference_free_handler);
1055                 Put(QItemHandlers, HKEY("envelope_from"), QItem_Handle_EnvelopeFrom, reference_free_handler);
1056                 Put(QItemHandlers, HKEY("retry"), QItem_Handle_retry, reference_free_handler);
1057                 Put(QItemHandlers, HKEY("attempted"), QItem_Handle_Attempted, reference_free_handler);
1058                 Put(QItemHandlers, HKEY("remote"), QItem_Handle_Recipient, reference_free_handler);
1059                 Put(QItemHandlers, HKEY("bounceto"), QItem_Handle_BounceTo, reference_free_handler);
1060                 Put(QItemHandlers, HKEY("source_room"), QItem_Handle_SenderRoom, reference_free_handler);
1061                 Put(QItemHandlers, HKEY("submitted"), QItem_Handle_Submitted, reference_free_handler);
1062                 smtp_init_spoolout();
1063
1064                 CtdlRegisterCleanupHook(smtp_evq_cleanup);
1065
1066                 CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands");
1067                 CtdlRegisterSessionHook(smtp_do_queue, EVT_TIMER);
1068         }
1069
1070         /* return our Subversion id for the Log */
1071         return "smtpeventclient";
1072 }