2d406f2e348f71b8e84c113f56ccb80e15e50eb1
[citadel.git] / citadel / modules / smtp / serv_smtpqueue.c
1 /*
2  * This module is an SMTP and ESMTP implementation for the Citadel system.
3  * It is compliant with all of the following:
4  *
5  * RFC  821 - Simple Mail Transfer Protocol
6  * RFC  876 - Survey of SMTP Implementations
7  * RFC 1047 - Duplicate messages and SMTP
8  * RFC 1652 - 8 bit MIME
9  * RFC 1869 - Extended Simple Mail Transfer Protocol
10  * RFC 1870 - SMTP Service Extension for Message Size Declaration
11  * RFC 2033 - Local Mail Transfer Protocol
12  * RFC 2197 - SMTP Service Extension for Command Pipelining
13  * RFC 2476 - Message Submission
14  * RFC 2487 - SMTP Service Extension for Secure SMTP over TLS
15  * RFC 2554 - SMTP Service Extension for Authentication
16  * RFC 2821 - Simple Mail Transfer Protocol
17  * RFC 2822 - Internet Message Format
18  * RFC 2920 - SMTP Service Extension for Command Pipelining
19  *  
20  * The VRFY and EXPN commands have been removed from this implementation
21  * because nobody uses these commands anymore, except for spammers.
22  *
23  * Copyright (c) 1998-2012 by the citadel.org team
24  *
25  *  This program is open source software; you can redistribute it and/or modify
26  *  it under the terms of the GNU General Public License version 3.
27  *  
28  *  
29  *
30  *  This program is distributed in the hope that it will be useful,
31  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
32  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33  *  GNU General Public License for more details.
34  *
35  *  
36  *  
37  *  
38  */
39
40 #include "sysdep.h"
41 #include <stdlib.h>
42 #include <unistd.h>
43 #include <stdio.h>
44 #include <termios.h>
45 #include <fcntl.h>
46 #include <signal.h>
47 #include <pwd.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <syslog.h>
51
52 #if TIME_WITH_SYS_TIME
53 # include <sys/time.h>
54 # include <time.h>
55 #else
56 # if HAVE_SYS_TIME_H
57 #  include <sys/time.h>
58 # else
59 #  include <time.h>
60 # endif
61 #endif
62 #include <sys/wait.h>
63 #include <ctype.h>
64 #include <string.h>
65 #include <limits.h>
66 #include <sys/socket.h>
67 #include <netinet/in.h>
68 #include <arpa/inet.h>
69 #include <libcitadel.h>
70 #include "citadel.h"
71 #include "server.h"
72 #include "citserver.h"
73 #include "support.h"
74 #include "config.h"
75 #include "control.h"
76 #include "user_ops.h"
77 #include "database.h"
78 #include "msgbase.h"
79 #include "internet_addressing.h"
80 #include "genstamp.h"
81 #include "domain.h"
82 #include "clientsocket.h"
83 #include "locate_host.h"
84 #include "citadel_dirs.h"
85
86 #include "ctdl_module.h"
87
88 #include "smtpqueue.h"
89 #include "event_client.h"
90
91
92 struct CitContext smtp_queue_CC;
93 pthread_mutex_t ActiveQItemsLock;
94 HashList *ActiveQItems  = NULL;
95 HashList *QItemHandlers = NULL;
96 int max_sessions_for_outbound_smtp = 500; /* how many sessions might be active till we stop adding more smtp jobs */
97 int ndelay_count = 50; /* every n queued messages we will sleep... */
98 int delay_msec = 5000; /* this many seconds. */
99
100 static const long MaxRetry = SMTP_RETRY_INTERVAL * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2;
101 int MsgCount            = 0;
102 int run_queue_now       = 0;    /* Set to 1 to ignore SMTP send retry times */
103
104 void smtp_try_one_queue_entry(OneQueItem *MyQItem,
105                               MailQEntry *MyQEntry,
106                               StrBuf *MsgText,
107 /* KeepMsgText allows us to use MsgText as ours. */
108                               int KeepMsgText,
109                               int MsgCount,
110                               ParsedURL *RelayUrls);
111
112
113 void smtp_evq_cleanup(void)
114 {
115
116         pthread_mutex_lock(&ActiveQItemsLock);
117         DeleteHash(&QItemHandlers);
118         DeleteHash(&ActiveQItems);
119         pthread_mutex_unlock(&ActiveQItemsLock);
120         pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
121 /*      citthread_mutex_destroy(&ActiveQItemsLock); TODO */
122 }
123
124 int DecreaseQReference(OneQueItem *MyQItem)
125 {
126         int IDestructQueItem;
127
128         pthread_mutex_lock(&ActiveQItemsLock);
129         MyQItem->ActiveDeliveries--;
130         IDestructQueItem = MyQItem->ActiveDeliveries == 0;
131         pthread_mutex_unlock(&ActiveQItemsLock);
132         return IDestructQueItem;
133 }
134
135 void RemoveQItem(OneQueItem *MyQItem)
136 {
137         long len;
138         const char* Key;
139         void *VData;
140         HashPos  *It;
141
142         pthread_mutex_lock(&ActiveQItemsLock);
143         It = GetNewHashPos(ActiveQItems, 0);
144         if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
145                 DeleteEntryFromHash(ActiveQItems, It);
146         else
147         {
148                 syslog(LOG_WARNING,
149                        "SMTP cleanup: unable to find QItem with ID[%ld]",
150                        MyQItem->MessageID);
151                 while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
152                         syslog(LOG_WARNING,
153                                "SMTP cleanup: have_: ID[%ld]",
154                                ((OneQueItem *)VData)->MessageID);
155         }
156         pthread_mutex_unlock(&ActiveQItemsLock);
157         DeleteHashPos(&It);
158 }
159
160
161 void FreeMailQEntry(void *qv)
162 {
163         MailQEntry *Q = qv;
164         FreeStrBuf(&Q->Recipient);
165         FreeStrBuf(&Q->StatusMessage);
166         free(Q);
167 }
168 void FreeQueItem(OneQueItem **Item)
169 {
170         DeleteHash(&(*Item)->MailQEntries);
171         FreeStrBuf(&(*Item)->EnvelopeFrom);
172         FreeStrBuf(&(*Item)->BounceTo);
173         FreeURL(&(*Item)->URL);
174         free(*Item);
175         Item = NULL;
176 }
177 void HFreeQueItem(void *Item)
178 {
179         FreeQueItem((OneQueItem**)&Item);
180 }
181
182 /* inspect recipients with a status of:
183  * - 0 (no delivery yet attempted)
184  * - 3/4 (transient errors
185  *        were experienced and it's time to try again)
186  */
187 int CountActiveQueueEntries(OneQueItem *MyQItem)
188 {
189         HashPos  *It;
190         long len;
191         long ActiveDeliveries;
192         const char *Key;
193         void *vQE;
194
195         ActiveDeliveries = 0;
196         It = GetNewHashPos(MyQItem->MailQEntries, 0);
197         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
198         {
199                 MailQEntry *ThisItem = vQE;
200                 if ((ThisItem->Status == 0) ||
201                     (ThisItem->Status == 3) ||
202                     (ThisItem->Status == 4))
203                 {
204                         ActiveDeliveries++;
205                         ThisItem->Active = 1;
206                 }
207                 else
208                         ThisItem->Active = 0;
209         }
210         DeleteHashPos(&It);
211         return ActiveDeliveries;
212 }
213
214 OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID)
215 {
216         OneQueItem *Item;
217         const char *pLine = NULL;
218         StrBuf *Line;
219         StrBuf *Token;
220         void *v;
221
222         Item = (OneQueItem*)malloc(sizeof(OneQueItem));
223         memset(Item, 0, sizeof(OneQueItem));
224         Item->Retry = SMTP_RETRY_INTERVAL;
225         Item->MessageID = -1;
226         Item->QueMsgID = QueMsgID;
227
228         Token = NewStrBuf();
229         Line = NewStrBufPlain(NULL, 128);
230         while (pLine != StrBufNOTNULL) {
231                 const char *pItemPart = NULL;
232                 void *vHandler;
233
234                 StrBufExtract_NextToken(Line, RawQItem, &pLine, '\n');
235                 if (StrLength(Line) == 0) continue;
236                 StrBufExtract_NextToken(Token, Line, &pItemPart, '|');
237                 if (GetHash(QItemHandlers, SKEY(Token), &vHandler))
238                 {
239                         QItemHandler H;
240                         H = (QItemHandler) vHandler;
241                         H(Item, Line, &pItemPart);
242                 }
243         }
244         FreeStrBuf(&Line);
245         FreeStrBuf(&Token);
246
247         if (Item->Retry >= MaxRetry)
248                 Item->FailNow = 1;
249
250         pthread_mutex_lock(&ActiveQItemsLock);
251         if (GetHash(ActiveQItems,
252                     LKEY(Item->MessageID),
253                     &v))
254         {
255                 /* WHOOPS. somebody else is already working on this. */
256                 pthread_mutex_unlock(&ActiveQItemsLock);
257                 FreeQueItem(&Item);
258                 return NULL;
259         }
260         else {
261                 /* mark our claim on this. */
262                 Put(ActiveQItems,
263                     LKEY(Item->MessageID),
264                     Item,
265                     HFreeQueItem);
266                 pthread_mutex_unlock(&ActiveQItemsLock);
267         }
268
269         return Item;
270 }
271
272 StrBuf *SerializeQueueItem(OneQueItem *MyQItem)
273 {
274         StrBuf *QMessage;
275         HashPos  *It;
276         const char *Key;
277         long len;
278         void *vQE;
279
280         QMessage = NewStrBufPlain(NULL, SIZ);
281         StrBufPrintf(QMessage, "Content-type: %s\n", SPOOLMIME);
282 //      "attempted|%ld\n"  "retry|%ld\n",, (long)time(NULL), (long)retry );
283         StrBufAppendBufPlain(QMessage, HKEY("\nmsgid|"), 0);
284         StrBufAppendPrintf(QMessage, "%ld", MyQItem->MessageID);
285
286         StrBufAppendBufPlain(QMessage, HKEY("\nsubmitted|"), 0);
287         StrBufAppendPrintf(QMessage, "%ld", MyQItem->Submitted);
288
289         if (StrLength(MyQItem->BounceTo) > 0) {
290                 StrBufAppendBufPlain(QMessage, HKEY("\nbounceto|"), 0);
291                 StrBufAppendBuf(QMessage, MyQItem->BounceTo, 0);
292         }
293
294         if (StrLength(MyQItem->EnvelopeFrom) > 0) {
295                 StrBufAppendBufPlain(QMessage, HKEY("\nenvelope_from|"), 0);
296                 StrBufAppendBuf(QMessage, MyQItem->EnvelopeFrom, 0);
297         }
298
299         StrBufAppendBufPlain(QMessage, HKEY("\nretry|"), 0);
300         StrBufAppendPrintf(QMessage, "%ld",
301                            MyQItem->Retry);
302
303         StrBufAppendBufPlain(QMessage, HKEY("\nattempted|"), 0);
304         StrBufAppendPrintf(QMessage, "%ld",
305                            MyQItem->ReattemptWhen);
306
307         It = GetNewHashPos(MyQItem->MailQEntries, 0);
308         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
309         {
310                 MailQEntry *ThisItem = vQE;
311
312                 if (!ThisItem->Active)
313                 {
314                         /* skip already sent ones from the spoolfile. */
315                         continue;
316                 }
317                 StrBufAppendBufPlain(QMessage, HKEY("\nremote|"), 0);
318                 StrBufAppendBuf(QMessage, ThisItem->Recipient, 0);
319                 StrBufAppendBufPlain(QMessage, HKEY("|"), 0);
320                 StrBufAppendPrintf(QMessage, "%d", ThisItem->Status);
321                 StrBufAppendBufPlain(QMessage, HKEY("|"), 0);
322                 StrBufAppendBuf(QMessage, ThisItem->StatusMessage, 0);
323         }
324         DeleteHashPos(&It);
325         StrBufAppendBufPlain(QMessage, HKEY("\n"), 0);
326         return QMessage;
327 }
328
329
330
331
332
333 void NewMailQEntry(OneQueItem *Item)
334 {
335         Item->Current = (MailQEntry*) malloc(sizeof(MailQEntry));
336         memset(Item->Current, 0, sizeof(MailQEntry));
337
338         if (Item->MailQEntries == NULL)
339                 Item->MailQEntries = NewHash(1, Flathash);
340         Item->Current->StatusMessage = NewStrBuf();
341         Item->Current->n = GetCount(Item->MailQEntries);
342         Put(Item->MailQEntries,
343             IKEY(Item->Current->n),
344             Item->Current,
345             FreeMailQEntry);
346 }
347
348 void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos)
349 {
350         Item->MessageID = StrBufExtractNext_long(Line, Pos, '|');
351 }
352
353 void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos)
354 {
355         if (Item->EnvelopeFrom == NULL)
356                 Item->EnvelopeFrom = NewStrBufPlain(NULL, StrLength(Line));
357         StrBufExtract_NextToken(Item->EnvelopeFrom, Line, Pos, '|');
358 }
359
360 void QItem_Handle_BounceTo(OneQueItem *Item, StrBuf *Line, const char **Pos)
361 {
362         if (Item->BounceTo == NULL)
363                 Item->BounceTo = NewStrBufPlain(NULL, StrLength(Line));
364         StrBufExtract_NextToken(Item->BounceTo, Line, Pos, '|');
365 }
366
367 void QItem_Handle_Recipient(OneQueItem *Item, StrBuf *Line, const char **Pos)
368 {
369         if (Item->Current == NULL)
370                 NewMailQEntry(Item);
371         if (Item->Current->Recipient == NULL)
372                 Item->Current->Recipient=NewStrBufPlain(NULL, StrLength(Line));
373         StrBufExtract_NextToken(Item->Current->Recipient, Line, Pos, '|');
374         Item->Current->Status = StrBufExtractNext_int(Line, Pos, '|');
375         StrBufExtract_NextToken(Item->Current->StatusMessage, Line, Pos, '|');
376         Item->Current = NULL; // TODO: is this always right?
377 }
378
379
380 void QItem_Handle_retry(OneQueItem *Item, StrBuf *Line, const char **Pos)
381 {
382         Item->Retry =
383                 StrBufExtractNext_int(Line, Pos, '|');
384         Item->Retry *= 2;
385 }
386
387
388 void QItem_Handle_Submitted(OneQueItem *Item, StrBuf *Line, const char **Pos)
389 {
390         Item->Submitted = atol(*Pos);
391
392 }
393
394 void QItem_Handle_Attempted(OneQueItem *Item, StrBuf *Line, const char **Pos)
395 {
396         Item->ReattemptWhen = StrBufExtractNext_int(Line, Pos, '|');
397 }
398
399
400
401 /**
402  * this one has to have the context for loading the message via the redirect buffer...
403  */
404 StrBuf *smtp_load_msg(OneQueItem *MyQItem, int n)
405 {
406         CitContext *CCC=CC;
407         StrBuf *SendMsg;
408
409         CCC->redirect_buffer = NewStrBufPlain(NULL, SIZ);
410         CtdlOutputMsg(MyQItem->MessageID,
411                       MT_RFC822, HEADERS_ALL,
412                       0, 1, NULL,
413                       (ESC_DOT|SUPPRESS_ENV_TO) );
414
415         SendMsg = CCC->redirect_buffer;
416         CCC->redirect_buffer = NULL;
417         if ((StrLength(SendMsg) > 0) &&
418             ChrPtr(SendMsg)[StrLength(SendMsg) - 1] != '\n') {
419                 syslog(LOG_WARNING,
420                        "SMTP client[%d]: Possible problem: message did not "
421                        "correctly terminate. (expecting 0x10, got 0x%02x)\n",
422                        MsgCount, //yes uncool, but best choice here...
423                        ChrPtr(SendMsg)[StrLength(SendMsg) - 1] );
424                 StrBufAppendBufPlain(SendMsg, HKEY("\r\n"), 0);
425         }
426         return SendMsg;
427 }
428
429
430
431 /*
432  * smtp_do_bounce() is caled by smtp_do_procmsg() to scan a set of delivery
433  * instructions for "5" codes (permanent fatal errors) and produce/deliver
434  * a "bounce" message (delivery status notification).
435  */
436 void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt)
437 {
438         static int seq = 0;
439
440         struct CtdlMessage *bmsg = NULL;
441         StrBuf *boundary;
442         StrBuf *Msg = NULL;
443         StrBuf *BounceMB;
444         struct recptypes *valid;
445
446         HashPos *It;
447         void *vQE;
448         long len;
449         const char *Key;
450
451         int successful_bounce = 0;
452         int num_bounces = 0;
453         int give_up = 0;
454
455         syslog(LOG_DEBUG, "smtp_do_bounce() called\n");
456
457         if ( (ev_time() - MyQItem->Submitted) > SMTP_GIVE_UP ) {
458                 give_up = 1;/// TODO: replace time by libevq timer get
459         }
460
461         /*
462          * Now go through the instructions checking for stuff.
463          */
464         It = GetNewHashPos(MyQItem->MailQEntries, 0);
465         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
466         {
467                 MailQEntry *ThisItem = vQE;
468                 if ((ThisItem->Status == 5) || /* failed now? */
469                     ((give_up == 1) &&
470                      (ThisItem->Status != 2)))
471                         /* giving up after failed attempts... */
472                 {
473                         if (num_bounces == 0)
474                                 Msg = NewStrBufPlain(NULL, 1024);
475                         ++num_bounces;
476
477                         StrBufAppendBuf(Msg, ThisItem->Recipient, 0);
478                         StrBufAppendBufPlain(Msg, HKEY(": "), 0);
479                         StrBufAppendBuf(Msg, ThisItem->StatusMessage, 0);
480                         StrBufAppendBufPlain(Msg, HKEY("\r\n"), 0);
481                 }
482         }
483         DeleteHashPos(&It);
484
485         /* Deliver the bounce if there's anything worth mentioning */
486         syslog(LOG_DEBUG, "num_bounces = %d\n", num_bounces);
487
488         if (num_bounces == 0) {
489                 FreeStrBuf(&Msg);
490                 return;
491         }
492
493         boundary = NewStrBufPlain(HKEY("=_Citadel_Multipart_"));
494         StrBufAppendPrintf(boundary,
495                            "%s_%04x%04x",
496                            config.c_fqdn,
497                            getpid(),
498                            ++seq);
499
500         /* Start building our bounce message; go shopping for memory first. */
501         BounceMB = NewStrBufPlain(
502                 NULL,
503                 1024 + /* mime stuff.... */
504                 StrLength(Msg) +  /* the bounce information... */
505                 StrLength(OMsgTxt)); /* the original message */
506         if (BounceMB == NULL) {
507                 FreeStrBuf(&boundary);
508                 syslog(LOG_ERR, "Failed to alloc() bounce message.\n");
509
510                 return;
511         }
512
513         bmsg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
514         if (bmsg == NULL) {
515                 FreeStrBuf(&boundary);
516                 FreeStrBuf(&BounceMB);
517                 syslog(LOG_ERR, "Failed to alloc() bounce message.\n");
518
519                 return;
520         }
521         memset(bmsg, 0, sizeof(struct CtdlMessage));
522
523
524         StrBufAppendBufPlain(BounceMB, HKEY("Content-type: multipart/mixed; boundary=\""), 0);
525         StrBufAppendBuf(BounceMB, boundary, 0);
526         StrBufAppendBufPlain(BounceMB, HKEY("\"\r\n"), 0);
527         StrBufAppendBufPlain(BounceMB, HKEY("MIME-Version: 1.0\r\n"), 0);
528         StrBufAppendBufPlain(BounceMB, HKEY("X-Mailer: " CITADEL "\r\n"), 0);
529         StrBufAppendBufPlain(BounceMB, HKEY("\r\nThis is a multipart message in MIME format.\r\n\r\n"), 0);
530         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
531         StrBufAppendBuf(BounceMB, boundary, 0);
532         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
533         StrBufAppendBufPlain(BounceMB, HKEY("Content-type: text/plain\r\n\r\n"), 0);
534
535         if (give_up)
536                 StrBufAppendBufPlain(
537                         BounceMB,
538                         HKEY(
539                                 "A message you sent could not be delivered "
540                                 "to some or all of its recipients\n"
541                                 "due to prolonged unavailability "
542                                 "of its destination(s).\n"
543                                 "Giving up on the following addresses:\n\n"
544                                 ), 0);
545         else
546                 StrBufAppendBufPlain(
547                         BounceMB,
548                         HKEY(
549                                 "A message you sent could not be delivered "
550                                 "to some or all of its recipients.\n"
551                                 "The following addresses "
552                                 "were undeliverable:\n\n"
553                                 ), 0);
554
555         StrBufAppendBuf(BounceMB, Msg, 0);
556         FreeStrBuf(&Msg);
557
558         /* Attach the original message */
559         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
560         StrBufAppendBuf(BounceMB, boundary, 0);
561         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
562         StrBufAppendBufPlain(BounceMB,
563                              HKEY("Content-type: message/rfc822\r\n"), 0);
564         StrBufAppendBufPlain(BounceMB,
565                              HKEY("Content-Transfer-Encoding: 7bit\r\n"), 0);
566         StrBufAppendBufPlain(BounceMB,
567                              HKEY("Content-Disposition: inline\r\n"), 0);
568         StrBufAppendBufPlain(BounceMB, HKEY("\r\n"), 0);
569         StrBufAppendBuf(BounceMB, OMsgTxt, 0);
570
571         /* Close the multipart MIME scope */
572         StrBufAppendBufPlain(BounceMB, HKEY("--"), 0);
573         StrBufAppendBuf(BounceMB, boundary, 0);
574         StrBufAppendBufPlain(BounceMB, HKEY("--\r\n"), 0);
575
576         bmsg->cm_magic = CTDLMESSAGE_MAGIC;
577         bmsg->cm_anon_type = MES_NORMAL;
578         bmsg->cm_format_type = FMT_RFC822;
579
580         bmsg->cm_fields['O'] = strdup(MAILROOM);
581         bmsg->cm_fields['A'] = strdup("Citadel");
582         bmsg->cm_fields['N'] = strdup(config.c_nodename);
583         bmsg->cm_fields['U'] = strdup("Delivery Status Notification (Failure)");
584         bmsg->cm_fields['M'] = SmashStrBuf(&BounceMB);
585
586         /* First try the user who sent the message */
587         if (StrLength(MyQItem->BounceTo) == 0)
588                 syslog(LOG_ERR, "No bounce address specified\n");
589         else
590                 syslog(LOG_DEBUG, "bounce to user? <%s>\n",
591                        ChrPtr(MyQItem->BounceTo));
592
593         /* Can we deliver the bounce to the original sender? */
594         valid = validate_recipients(ChrPtr(MyQItem->BounceTo), NULL, 0);
595         if ((valid != NULL) && (valid->num_error == 0)) {
596                 CtdlSubmitMsg(bmsg, valid, "", QP_EADDR);
597                 successful_bounce = 1;
598         }
599
600         /* If not, post it in the Aide> room */
601         if (successful_bounce == 0) {
602                 CtdlSubmitMsg(bmsg, NULL, config.c_aideroom, QP_EADDR);
603         }
604
605         /* Free up the memory we used */
606         free_recipients(valid);
607         FreeStrBuf(&boundary);
608         CtdlFreeMessage(bmsg);
609         syslog(LOG_DEBUG, "Done processing bounces\n");
610 }
611
612 /*
613  * smtp_do_procmsg()
614  *
615  * Called by smtp_do_queue() to handle an individual message.
616  */
617 void smtp_do_procmsg(long msgnum, void *userdata) {
618         int mynumsessions = num_sessions;
619         struct CtdlMessage *msg = NULL;
620         char *instr = NULL;
621         StrBuf *PlainQItem;
622         OneQueItem *MyQItem;
623         char *pch;
624         HashPos  *It;
625         void *vQE;
626         long len;
627         const char *Key;
628         int nRelays = 0;
629         ParsedURL *RelayUrls = NULL;
630         int HaveBuffers = 0;
631         StrBuf *Msg =NULL;
632
633         if (mynumsessions > max_sessions_for_outbound_smtp) {
634                 syslog(LOG_DEBUG,
635                        "SMTP Queue: skipping because of num jobs %d > %d max_sessions_for_outbound_smtp",
636                        mynumsessions,
637                        max_sessions_for_outbound_smtp);
638         }
639
640         syslog(LOG_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
641         ///strcpy(envelope_from, "");
642
643         msg = CtdlFetchMessage(msgnum, 1);
644         if (msg == NULL) {
645                 syslog(LOG_ERR, "SMTP Queue: tried %ld but no such message!\n",
646                        msgnum);
647                 return;
648         }
649
650         pch = instr = msg->cm_fields['M'];
651
652         /* Strip out the headers (no not amd any other non-instruction) line */
653         while (pch != NULL) {
654                 pch = strchr(pch, '\n');
655                 if ((pch != NULL) && (*(pch + 1) == '\n')) {
656                         instr = pch + 2;
657                         pch = NULL;
658                 }
659         }
660         PlainQItem = NewStrBufPlain(instr, -1);
661         CtdlFreeMessage(msg);
662         MyQItem = DeserializeQueueItem(PlainQItem, msgnum);
663         FreeStrBuf(&PlainQItem);
664
665         if (MyQItem == NULL) {
666                 syslog(LOG_ERR,
667                        "SMTP Queue: Msg No %ld: already in progress!\n",
668                        msgnum);
669                 return; /* s.b. else is already processing... */
670         }
671
672         /*
673          * Postpone delivery if we've already tried recently.
674          */
675         if ((MyQItem->ReattemptWhen != 0) && 
676             (time(NULL) < MyQItem->ReattemptWhen) &&
677             (run_queue_now == 0))
678         {
679                 syslog(LOG_DEBUG, "SMTP client: Retry time not yet reached.\n");
680
681                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
682                 pthread_mutex_lock(&ActiveQItemsLock);
683                 {
684                         if (GetHashPosFromKey(ActiveQItems,
685                                               LKEY(MyQItem->MessageID),
686                                               It))
687                         {
688                                 DeleteEntryFromHash(ActiveQItems, It);
689                         }
690                 }
691                 pthread_mutex_unlock(&ActiveQItemsLock);
692                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
693                 DeleteHashPos(&It);
694                 return;
695         }
696
697         /*
698          * Bail out if there's no actual message associated with this
699          */
700         if (MyQItem->MessageID < 0L) {
701                 syslog(LOG_ERR, "SMTP Queue: no 'msgid' directive found!\n");
702                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
703                 pthread_mutex_lock(&ActiveQItemsLock);
704                 {
705                         if (GetHashPosFromKey(ActiveQItems,
706                                               LKEY(MyQItem->MessageID),
707                                               It))
708                         {
709                                 DeleteEntryFromHash(ActiveQItems, It);
710                         }
711                 }
712                 pthread_mutex_unlock(&ActiveQItemsLock);
713                 DeleteHashPos(&It);
714                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
715                 return;
716         }
717
718         {
719                 char mxbuf[SIZ];
720                 ParsedURL **Url = &MyQItem->URL;
721                 nRelays = get_hosts(mxbuf, "smarthost");
722                 if (nRelays > 0) {
723                         StrBuf *All;
724                         StrBuf *One;
725                         const char *Pos = NULL;
726                         All = NewStrBufPlain(mxbuf, -1);
727                         One = NewStrBufPlain(NULL, StrLength(All) + 1);
728
729                         while ((Pos != StrBufNOTNULL) &&
730                                ((Pos == NULL) ||
731                                 !IsEmptyStr(Pos)))
732                         {
733                                 StrBufExtract_NextToken(One, All, &Pos, '|');
734                                 if (!ParseURL(Url, One, 25))
735                                         syslog(LOG_DEBUG,
736                                                "Failed to parse: %s\n",
737                                                ChrPtr(One));
738                                 else {
739                                         ///if (!Url->IsIP)) // todo dupe me fork ipv6
740                                         Url = &(*Url)->Next;
741                                 }
742                         }
743                         FreeStrBuf(&All);
744                         FreeStrBuf(&One);
745                 }
746
747                 Url = &MyQItem->FallBackHost;
748                 nRelays = get_hosts(mxbuf, "fallbackhost");
749                 if (nRelays > 0) {
750                         StrBuf *All;
751                         StrBuf *One;
752                         const char *Pos = NULL;
753                         All = NewStrBufPlain(mxbuf, -1);
754                         One = NewStrBufPlain(NULL, StrLength(All) + 1);
755
756                         while ((Pos != StrBufNOTNULL) &&
757                                ((Pos == NULL) ||
758                                 !IsEmptyStr(Pos)))
759                         {
760                                 StrBufExtract_NextToken(One, All, &Pos, '|');
761                                 if (!ParseURL(Url, One, 25))
762                                         syslog(LOG_DEBUG,
763                                                "Failed to parse: %s\n",
764                                                ChrPtr(One));
765                                 else
766                                         Url = &(*Url)->Next;
767                         }
768                         FreeStrBuf(&All);
769                         FreeStrBuf(&One);
770                 }
771         }
772
773         It = GetNewHashPos(MyQItem->MailQEntries, 0);
774         while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
775         {
776                 MailQEntry *ThisItem = vQE;
777                 syslog(LOG_DEBUG, "SMTP Queue: Task: <%s> %d\n",
778                        ChrPtr(ThisItem->Recipient),
779                        ThisItem->Active);
780         }
781         DeleteHashPos(&It);
782
783         MyQItem->ActiveDeliveries = CountActiveQueueEntries(MyQItem);
784
785         /* failsafe against overload: 
786          * will we exceed the limit set? 
787          */
788         if ((MyQItem->ActiveDeliveries + mynumsessions > max_sessions_for_outbound_smtp) && 
789             /* if yes, did we reach more than half of the quota? */
790             ((mynumsessions * 2) > max_sessions_for_outbound_smtp) && 
791             /* if... would we ever fit into half of the quota?? */
792             (((MyQItem->ActiveDeliveries * 2)  < max_sessions_for_outbound_smtp)))
793         {
794                 /* abort delivery for another time. */
795                 syslog(LOG_DEBUG,
796                        "SMTP Queue: skipping because of num jobs %d + %ld > %d max_sessions_for_outbound_smtp",
797                        mynumsessions,
798                        MyQItem->ActiveDeliveries,
799                        max_sessions_for_outbound_smtp);
800
801                 FreeQueItem(&MyQItem);
802
803                 return;
804         }
805
806
807         if (MyQItem->ActiveDeliveries > 0)
808         {
809                 int nActivated = 0;
810                 int n = MsgCount++;
811                 int m = MyQItem->ActiveDeliveries;
812                 int i = 1;
813                 Msg = smtp_load_msg(MyQItem, n);
814                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
815                 while ((i <= m) &&
816                        (GetNextHashPos(MyQItem->MailQEntries,
817                                        It, &len, &Key, &vQE)))
818                 {
819                         MailQEntry *ThisItem = vQE;
820
821                         if (ThisItem->Active == 1)
822                         {
823                                 int KeepBuffers = (i == m);
824
825                                 nActivated++;
826                                 if (nActivated % ndelay_count == 0)
827                                         usleep(delay_msec);
828
829                                 if (i > 1) n = MsgCount++;
830                                 syslog(LOG_DEBUG,
831                                        "SMTPQ: Trying <%ld> <%s> %d / %d \n",
832                                        MyQItem->MessageID,
833                                        ChrPtr(ThisItem->Recipient),
834                                        i,
835                                        m);
836                                 smtp_try_one_queue_entry(MyQItem,
837                                                          ThisItem,
838                                                          Msg,
839                                                          KeepBuffers,
840                                                          n,
841                                                          RelayUrls);
842
843                                 if (KeepBuffers) HaveBuffers = 1;
844
845                                 i++;
846                         }
847                 }
848                 DeleteHashPos(&It);
849         }
850         else
851         {
852                 It = GetNewHashPos(MyQItem->MailQEntries, 0);
853                 pthread_mutex_lock(&ActiveQItemsLock);
854                 {
855                         if (GetHashPosFromKey(ActiveQItems,
856                                               LKEY(MyQItem->MessageID),
857                                               It))
858                         {
859                                 DeleteEntryFromHash(ActiveQItems, It);
860                         }
861                         else
862                         {
863                                 long len;
864                                 const char* Key;
865                                 void *VData;
866
867                                 syslog(LOG_WARNING,
868                                        "SMTP cleanup: unable to find "
869                                        "QItem with ID[%ld]",
870                                        MyQItem->MessageID);
871                                 while (GetNextHashPos(ActiveQItems,
872                                                       It,
873                                                       &len,
874                                                       &Key,
875                                                       &VData))
876                                 {
877                                         syslog(LOG_WARNING,
878                                                "SMTP cleanup: have: ID[%ld]",
879                                               ((OneQueItem *)VData)->MessageID);
880                                 }
881                         }
882
883                 }
884                 pthread_mutex_unlock(&ActiveQItemsLock);
885                 DeleteHashPos(&It);
886                 ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
887
888 // TODO: bounce & delete?
889
890         }
891         if (!HaveBuffers) {
892                 FreeStrBuf (&Msg);
893 // TODO : free RelayUrls
894         }
895 }
896
897
898
899 /*
900  * smtp_queue_thread()
901  *
902  * Run through the queue sending out messages.
903  */
904 void smtp_do_queue(void) {
905         static int is_running = 0;
906         int num_processed = 0;
907
908         if (is_running)
909                 return; /* Concurrency check - only one can run */
910         is_running = 1;
911
912         pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
913         syslog(LOG_INFO, "SMTP client: processing outbound queue");
914
915         if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
916                 syslog(LOG_ERR, "Cannot find room <%s>", SMTP_SPOOLOUT_ROOM);
917         }
918         else {
919                 num_processed = CtdlForEachMessage(MSGS_ALL,
920                                                    0L,
921                                                    NULL,
922                                                    SPOOLMIME,
923                                                    NULL,
924                                                    smtp_do_procmsg,
925                                                    NULL);
926         }
927         syslog(LOG_INFO,
928                "SMTP client: queue run completed; %d messages processed",
929                num_processed);
930
931         run_queue_now = 0;
932         is_running = 0;
933 }
934
935
936
937 /*
938  * Initialize the SMTP outbound queue
939  */
940 void smtp_init_spoolout(void) {
941         struct ctdlroom qrbuf;
942
943         /*
944          * Create the room.  This will silently fail if the room already
945          * exists, and that's perfectly ok, because we want it to exist.
946          */
947         CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_QUEUE);
948
949         /*
950          * Make sure it's set to be a "system room" so it doesn't show up
951          * in the <K>nown rooms list for Aides.
952          */
953         if (CtdlGetRoomLock(&qrbuf, SMTP_SPOOLOUT_ROOM) == 0) {
954                 qrbuf.QRflags2 |= QR2_SYSTEM;
955                 CtdlPutRoomLock(&qrbuf);
956         }
957 }
958
959
960
961
962 /*****************************************************************************/
963 /*                          SMTP UTILITY COMMANDS                            */
964 /*****************************************************************************/
965
966 void cmd_smtp(char *argbuf) {
967         char cmd[64];
968         char node[256];
969         char buf[1024];
970         int i;
971         int num_mxhosts;
972
973         if (CtdlAccessCheck(ac_aide)) return;
974
975         extract_token(cmd, argbuf, 0, '|', sizeof cmd);
976
977         if (!strcasecmp(cmd, "mx")) {
978                 extract_token(node, argbuf, 1, '|', sizeof node);
979                 num_mxhosts = getmx(buf, node);
980                 cprintf("%d %d MX hosts listed for %s\n",
981                         LISTING_FOLLOWS, num_mxhosts, node);
982                 for (i=0; i<num_mxhosts; ++i) {
983                         extract_token(node, buf, i, '|', sizeof node);
984                         cprintf("%s\n", node);
985                 }
986                 cprintf("000\n");
987                 return;
988         }
989
990         else if (!strcasecmp(cmd, "runqueue")) {
991                 run_queue_now = 1;
992                 cprintf("%d All outbound SMTP will be retried now.\n", CIT_OK);
993                 return;
994         }
995
996         else {
997                 cprintf("%d Invalid command.\n", ERROR + ILLEGAL_VALUE);
998         }
999
1000 }
1001
1002
1003 CTDL_MODULE_INIT(smtp_queu)
1004 {
1005         char *pstr;
1006
1007         if (!threading)
1008         {
1009                 pstr = getenv("CITSERVER_n_session_max");
1010                 if ((pstr != NULL) && (*pstr != '\0'))
1011                         max_sessions_for_outbound_smtp = atol(pstr); /* how many sessions might be active till we stop adding more smtp jobs */
1012
1013                 pstr = getenv("CITSERVER_smtp_n_delay_count");
1014                 if ((pstr != NULL) && (*pstr != '\0'))
1015                         ndelay_count = atol(pstr); /* every n queued messages we will sleep... */
1016
1017                 pstr = getenv("CITSERVER_smtp_delay");
1018                 if ((pstr != NULL) && (*pstr != '\0'))
1019                         delay_msec = atol(pstr) * 1000; /* this many seconds. */
1020
1021
1022
1023
1024                 CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
1025                 ActiveQItems = NewHash(1, lFlathash);
1026                 pthread_mutex_init(&ActiveQItemsLock, NULL);
1027
1028                 QItemHandlers = NewHash(0, NULL);
1029
1030                 Put(QItemHandlers, HKEY("msgid"), QItem_Handle_MsgID, reference_free_handler);
1031                 Put(QItemHandlers, HKEY("envelope_from"), QItem_Handle_EnvelopeFrom, reference_free_handler);
1032                 Put(QItemHandlers, HKEY("retry"), QItem_Handle_retry, reference_free_handler);
1033                 Put(QItemHandlers, HKEY("attempted"), QItem_Handle_Attempted, reference_free_handler);
1034                 Put(QItemHandlers, HKEY("remote"), QItem_Handle_Recipient, reference_free_handler);
1035                 Put(QItemHandlers, HKEY("bounceto"), QItem_Handle_BounceTo, reference_free_handler);
1036                 Put(QItemHandlers, HKEY("submitted"), QItem_Handle_Submitted, reference_free_handler);
1037
1038                 smtp_init_spoolout();
1039
1040                 CtdlRegisterCleanupHook(smtp_evq_cleanup);
1041
1042                 CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands");
1043                 CtdlRegisterSessionHook(smtp_do_queue, EVT_TIMER);
1044         }
1045
1046         /* return our Subversion id for the Log */
1047         return "smtpeventclient";
1048 }