From c8e73a84d645c9806635b272df36ec85cb5bd427 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sat, 8 Jan 2011 20:55:44 +0100 Subject: [PATCH] libev smtp migration - pass message text into bounce message - split SMTP <-> SMTP-Queue code --- .../modules/eventclient/serv_eventclient.c | 27 +- citadel/modules/smtp/serv_smtpeventclient.c | 945 ++++-------------- citadel/modules/smtp/serv_smtpqueue.c | 670 +++++++++++++ citadel/modules/smtp/smtp_util.c | 16 +- citadel/modules/smtp/smtp_util.h | 2 +- citadel/modules/smtp/smtpqueue.h | 37 + 6 files changed, 915 insertions(+), 782 deletions(-) create mode 100644 citadel/modules/smtp/serv_smtpqueue.c create mode 100644 citadel/modules/smtp/smtpqueue.h diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index a200754dc..a858048da 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -104,14 +104,11 @@ static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int reve /// TODO: add it to QueueEvents break; case 'x': - /////event_del(&queue_add_event); close(event_add_pipe[0]); /// TODO; flush QueueEvents fd's and delete it. ev_io_stop(event_base, &queue_add_event); ev_unloop(event_base, EVUNLOOP_ALL); } - /* Unblock the other side */ -// read(fd, buf, 1); CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n"); } @@ -120,12 +117,6 @@ void InitEventQueue(void) { struct rlimit LimitSet; -/// event_base = ev_default_loop(0); -/* - base = event_base_new(); - if (!base) - return NULL; / *XXXerr*/ - citthread_mutex_init(&EventQueueMutex, NULL); if (pipe(event_add_pipe) != 0) { @@ -150,25 +141,11 @@ void *client_event_thread(void *arg) { struct CitContext libevent_client_CC; - CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread"); + CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread"); // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n"); -/* - event_set(&queue_add_event, - event_add_pipe[0], - EV_READ|EV_PERSIST, - QueueEventAddCallback, - NULL); - - event_add(&queue_add_event, NULL); -*/ -/* - ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ); - ev_io_start(event_base, &queue_add_event); + CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n"); -*/ event_base = ev_default_loop (EVFLAG_AUTO); -/// ev_loop(event_base, 0); ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ); ev_io_start(event_base, &queue_add_event); diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index 7e2ffb868..839f138af 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -87,55 +87,9 @@ #include "smtp_util.h" #include "event_client.h" +#include "smtpqueue.h" #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT -HashList *QItemHandlers = NULL; - -citthread_mutex_t ActiveQItemsLock; -HashList *ActiveQItems = NULL; - -int run_queue_now = 0; /* Set to 1 to ignore SMTP send retry times */ -int MsgCount = 0; -/*****************************************************************************/ -/* SMTP CLIENT (Queue Management) STUFF */ -/*****************************************************************************/ - -#define MaxAttempts 15 -typedef struct _delivery_attempt { - time_t when; - time_t retry; -}DeliveryAttempt; - -typedef struct _mailq_entry { - DeliveryAttempt Attempts[MaxAttempts]; - int nAttempts; - StrBuf *Recipient; - StrBuf *StatusMessage; - int Status; - int n; - int Active; -}MailQEntry; -void FreeMailQEntry(void *qv) -{ - MailQEntry *Q = qv; - FreeStrBuf(&Q->Recipient); - FreeStrBuf(&Q->StatusMessage); - free(Q); -} - -typedef struct queueitem { - long MessageID; - long QueMsgID; - int FailNow; - HashList *MailQEntries; - MailQEntry *Current; /* copy of the currently parsed item in the MailQEntries list; if null add a new one. */ - DeliveryAttempt LastAttempt; - long ActiveDeliveries; - StrBuf *EnvelopeFrom; - StrBuf *BounceTo; -} OneQueItem; -typedef void (*QItemHandler)(OneQueItem *Item, StrBuf *Line, const char **Pos); - /*****************************************************************************/ /* SMTP CLIENT (OUTBOUND PROCESSING) STUFF */ /*****************************************************************************/ @@ -207,6 +161,7 @@ typedef struct _stmp_out_msg { char name[1024]; char mailfrom[1024]; } SmtpOutMsg; + void DeleteSmtpOutMsg(void *v) { SmtpOutMsg *Msg = v; @@ -222,179 +177,34 @@ eNextState SMTP_C_ConnFail(void *Data); eNextState SMTP_C_DispatchReadDone(void *Data); eNextState SMTP_C_DispatchWriteDone(void *Data); eNextState SMTP_C_Terminate(void *Data); -eNextState SMTP_C_MXLookup(void *Data); +eReadState SMTP_C_ReadServerStatus(AsyncIO *IO); typedef eNextState (*SMTPReadHandler)(SmtpOutMsg *Msg); typedef eNextState (*SMTPSendHandler)(SmtpOutMsg *Msg); +#define SMTP_ERROR(WHICH_ERR, ERRSTR) do {\ + SendMsg->MyQEntry->Status = WHICH_ERR; \ + StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); \ + return eAbort; } \ + while (0) +#define SMTP_VERROR(WHICH_ERR) do {\ + SendMsg->MyQEntry->Status = WHICH_ERR; \ + StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); \ + return eAbort; } \ + while (0) -void FreeQueItem(OneQueItem **Item) -{ - DeleteHash(&(*Item)->MailQEntries); - FreeStrBuf(&(*Item)->EnvelopeFrom); - FreeStrBuf(&(*Item)->BounceTo); - free(*Item); - Item = NULL; -} -void HFreeQueItem(void *Item) -{ - FreeQueItem((OneQueItem**)&Item); -} - - -/* inspect recipients with a status of: - * - 0 (no delivery yet attempted) - * - 3/4 (transient errors - * were experienced and it's time to try again) - */ -int CountActiveQueueEntries(OneQueItem *MyQItem) -{ - HashPos *It; - long len; - const char *Key; - void *vQE; - - MyQItem->ActiveDeliveries = 0; - It = GetNewHashPos(MyQItem->MailQEntries, 0); - while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) - { - MailQEntry *ThisItem = vQE; - if ((ThisItem->Status == 0) || - (ThisItem->Status == 3) || - (ThisItem->Status == 4)) - { - MyQItem->ActiveDeliveries++; - ThisItem->Active = 1; - } - else - ThisItem->Active = 0; - } - DeleteHashPos(&It); - return MyQItem->ActiveDeliveries; -} - -OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) -{ - OneQueItem *Item; - const char *pLine = NULL; - StrBuf *Line; - StrBuf *Token; - void *v; - - Item = (OneQueItem*)malloc(sizeof(OneQueItem)); - memset(Item, 0, sizeof(OneQueItem)); - Item->LastAttempt.retry = SMTP_RETRY_INTERVAL; - Item->MessageID = -1; - Item->QueMsgID = QueMsgID; - - citthread_mutex_lock(&ActiveQItemsLock); - if (GetHash(ActiveQItems, - IKEY(Item->QueMsgID), - &v)) - { - /* WHOOPS. somebody else is already working on this. */ - citthread_mutex_unlock(&ActiveQItemsLock); - FreeQueItem(&Item); - return NULL; - } - else { - /* mark our claim on this. */ - Put(ActiveQItems, - IKEY(Item->QueMsgID), - Item, - HFreeQueItem); - citthread_mutex_unlock(&ActiveQItemsLock); - } - - Token = NewStrBuf(); - Line = NewStrBufPlain(NULL, 128); - while (pLine != StrBufNOTNULL) { - const char *pItemPart = NULL; - void *vHandler; - - StrBufExtract_NextToken(Line, RawQItem, &pLine, '\n'); - if (StrLength(Line) == 0) continue; - StrBufExtract_NextToken(Token, Line, &pItemPart, '|'); - if (GetHash(QItemHandlers, SKEY(Token), &vHandler)) - { - QItemHandler H; - H = (QItemHandler) vHandler; - H(Item, Line, &pItemPart); - } - } - FreeStrBuf(&Line); - FreeStrBuf(&Token); - return Item; -} - -StrBuf *SerializeQueueItem(OneQueItem *MyQItem) -{ - StrBuf *QMessage; - HashPos *It; - const char *Key; - long len; - void *vQE; - - QMessage = NewStrBufPlain(NULL, SIZ); - StrBufPrintf(QMessage, "Content-type: %s\n", SPOOLMIME); - -// "attempted|%ld\n" "retry|%ld\n",, (long)time(NULL), (long)retry ); - StrBufAppendBufPlain(QMessage, HKEY("\nmsgid|"), 0); - StrBufAppendPrintf(QMessage, "%ld", MyQItem->MessageID); - - if (StrLength(MyQItem->BounceTo) > 0) { - StrBufAppendBufPlain(QMessage, HKEY("\nbounceto|"), 0); - StrBufAppendBuf(QMessage, MyQItem->BounceTo, 0); - } - - if (StrLength(MyQItem->EnvelopeFrom) > 0) { - StrBufAppendBufPlain(QMessage, HKEY("\nenvelope_from|"), 0); - StrBufAppendBuf(QMessage, MyQItem->EnvelopeFrom, 0); - } - - It = GetNewHashPos(MyQItem->MailQEntries, 0); - while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) - { - MailQEntry *ThisItem = vQE; - int i; - - if (!ThisItem->Active) - continue; /* skip already sent ones from the spoolfile. */ +#define SMTP_IS_STATE(WHICH_STATE) (ChrPtr(SendMsg->IO.IOBuf)[0] == WHICH_STATE) - for (i=0; i < ThisItem->nAttempts; i++) { - StrBufAppendBufPlain(QMessage, HKEY("\nretry|"), 0); - StrBufAppendPrintf(QMessage, "%ld", - ThisItem->Attempts[i].retry); +#define SMTP_DBG_SEND() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: > %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) +#define SMTP_DBG_READ() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: < %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) - StrBufAppendBufPlain(QMessage, HKEY("\nattempted|"), 0); - StrBufAppendPrintf(QMessage, "%ld", - ThisItem->Attempts[i].when); - } - StrBufAppendBufPlain(QMessage, HKEY("\nremote|"), 0); - StrBufAppendBuf(QMessage, ThisItem->Recipient, 0); - StrBufAppendBufPlain(QMessage, HKEY("|"), 0); - StrBufAppendPrintf(QMessage, "%d", ThisItem->Status); - StrBufAppendBufPlain(QMessage, HKEY("|"), 0); - StrBufAppendBuf(QMessage, ThisItem->StatusMessage, 0); - } - DeleteHashPos(&It); - StrBufAppendBufPlain(QMessage, HKEY("\n"), 0); - return QMessage; -} void FinalizeMessageSend(SmtpOutMsg *Msg) { - int IDestructQueItem; - HashPos *It; - - citthread_mutex_lock(&ActiveQItemsLock); - Msg->MyQItem->ActiveDeliveries--; - IDestructQueItem = Msg->MyQItem->ActiveDeliveries == 0; - citthread_mutex_unlock(&ActiveQItemsLock); - - if (IDestructQueItem) { + if (DecreaseQReference(Msg->MyQItem)) + { int nRemain; StrBuf *MsgData; @@ -408,8 +218,6 @@ void FinalizeMessageSend(SmtpOutMsg *Msg) */ CtdlDeleteMessages(SMTP_SPOOLOUT_ROOM, &Msg->MyQItem->QueMsgID, 1, ""); - /* Generate 'bounce' messages * / - smtp_do_bounce(instr); */ if (nRemain > 0) { struct CtdlMessage *msg; msg = malloc(sizeof(struct CtdlMessage)); @@ -418,75 +226,131 @@ void FinalizeMessageSend(SmtpOutMsg *Msg) msg->cm_anon_type = MES_NORMAL; msg->cm_format_type = FMT_RFC822; msg->cm_fields['M'] = SmashStrBuf(&MsgData); + /* Generate 'bounce' messages */ + smtp_do_bounce(msg->cm_fields['M'], + Msg->msgtext); CtdlSubmitMsg(msg, NULL, SMTP_SPOOLOUT_ROOM, QP_EADDR); CtdlFreeMessage(msg); } - It = GetNewHashPos(Msg->MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); - { - GetHashPosFromKey(ActiveQItems, IKEY(Msg->MyQItem->MessageID), It); - DeleteEntryFromHash(ActiveQItems, It); - } - citthread_mutex_unlock(&ActiveQItemsLock); - DeleteHashPos(&It); + + RemoveQItem(Msg->MyQItem); } -/// TODO : else free message... close(Msg->IO.sock); DeleteSmtpOutMsg(Msg); } -eReadState SMTP_C_ReadServerStatus(AsyncIO *IO) + + + +void get_one_mx_host_ip_done(void *Ctx, + int status, + int timeouts, + struct hostent *hostent) { - eReadState Finished = eBufferNotEmpty; + AsyncIO *IO = Ctx; + SmtpOutMsg *SendMsg = IO->Data; + if ((status == ARES_SUCCESS) && (hostent != NULL) ) { + CtdlLogPrintf(CTDL_DEBUG, + "SMTP client[%ld]: connecting to %s : %d ...\n", + SendMsg->n, + SendMsg->mx_host, + SendMsg->IO.dport); - while (Finished == eBufferNotEmpty) { - Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); - - switch (Finished) { - case eMustReadMore: /// read new from socket... - return Finished; - break; - case eBufferNotEmpty: /* shouldn't happen... */ - case eReadSuccess: /// done for now... - if (StrLength(IO->IOBuf) < 4) - continue; - if (ChrPtr(IO->IOBuf)[3] == '-') - Finished = eBufferNotEmpty; - else - return Finished; - break; - case eReadFail: /// WHUT? - ///todo: shut down! - break; + + SendMsg->IO.HEnt = hostent; + InitEventIO(IO, SendMsg, + SMTP_C_DispatchReadDone, + SMTP_C_DispatchWriteDone, + SMTP_C_Terminate, + SMTP_C_Timeout, + SMTP_C_ConnFail, + SMTP_C_ReadServerStatus, + 1); + + } +} + +const unsigned short DefaultMXPort = 25; +void get_one_mx_host_ip(SmtpOutMsg *SendMsg) +{ + //char *endpart; + //char buf[SIZ]; + + SendMsg->IO.dport = DefaultMXPort; + + +/* TODO: Relay! + *SendMsg->mx_user = '\0'; + *SendMsg->mx_pass = '\0'; + if (num_tokens(buf, '@') > 1) { + strcpy (SendMsg->mx_user, buf); + endpart = strrchr(SendMsg->mx_user, '@'); + *endpart = '\0'; + strcpy (SendMsg->mx_host, endpart + 1); + endpart = strrchr(SendMsg->mx_user, ':'); + if (endpart != NULL) { + strcpy(SendMsg->mx_pass, endpart+1); + *endpart = '\0'; } + + endpart = strrchr(SendMsg->mx_host, ':'); + if (endpart != 0){ + *endpart = '\0'; + strcpy(SendMsg->mx_port, endpart + 1); + } } - return Finished; + else +*/ + SendMsg->mx_host = SendMsg->CurrMX->host; + SendMsg->CurrMX = SendMsg->CurrMX->next; + + CtdlLogPrintf(CTDL_DEBUG, + "SMTP client[%ld]: looking up %s : %d ...\n", + SendMsg->n, + SendMsg->mx_host); + + ares_gethostbyname(SendMsg->IO.DNSChannel, + SendMsg->mx_host, + AF_INET6, /* it falls back to ipv4 in doubt... */ + get_one_mx_host_ip_done, + &SendMsg->IO); } -/** - * this one has to have the context for loading the message via the redirect buffer... - */ -StrBuf *smtp_load_msg(OneQueItem *MyQItem) + +eNextState smtp_resolve_mx_done(void *data) { - CitContext *CCC=CC; - StrBuf *SendMsg; - - CCC->redirect_buffer = NewStrBufPlain(NULL, SIZ); - CtdlOutputMsg(MyQItem->MessageID, MT_RFC822, HEADERS_ALL, 0, 1, NULL, (ESC_DOT|SUPPRESS_ENV_TO) ); - SendMsg = CCC->redirect_buffer; - CCC->redirect_buffer = NULL; - if ((StrLength(SendMsg) > 0) && - ChrPtr(SendMsg)[StrLength(SendMsg) - 1] != '\n') { - CtdlLogPrintf(CTDL_WARNING, - "SMTP client[%ld]: Possible problem: message did not " - "correctly terminate. (expecting 0x10, got 0x%02x)\n", - MsgCount, //yes uncool, but best choice here... - ChrPtr(SendMsg)[StrLength(SendMsg) - 1] ); - StrBufAppendBufPlain(SendMsg, HKEY("\r\n"), 0); + AsyncIO *IO = data; + SmtpOutMsg * SendMsg = IO->Data; + + SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); + SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + SendMsg->IO.IOBuf = NewStrBuf(); + SendMsg->IO.ErrMsg = SendMsg->MyQEntry->StatusMessage; + + SendMsg->CurrMX = SendMsg->AllMX = IO->VParsedDNSReply; + //// TODO: should we remove the current ares context??? + get_one_mx_host_ip(SendMsg); + return 0; +} + + +int resolve_mx_records(void *Ctx) +{ + SmtpOutMsg * SendMsg = Ctx; + + if (!QueueQuery(ns_t_mx, + SendMsg->node, + &SendMsg->IO, + smtp_resolve_mx_done)) + { + SendMsg->MyQEntry->Status = 5; + StrBufPrintf(SendMsg->MyQEntry->StatusMessage, + "No MX hosts found for <%s>", SendMsg->node); + return 0; ///////TODO: abort! } - return SendMsg; + return 0; } @@ -563,98 +427,41 @@ int smtp_resolve_recipients(SmtpOutMsg *SendMsg) } -#define SMTP_ERROR(WHICH_ERR, ERRSTR) do {\ - SendMsg->MyQEntry->Status = WHICH_ERR; \ - StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); \ - return eAbort; } \ - while (0) -#define SMTP_VERROR(WHICH_ERR) do {\ - SendMsg->MyQEntry->Status = WHICH_ERR; \ - StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); \ - return eAbort; } \ - while (0) - -#define SMTP_IS_STATE(WHICH_STATE) (ChrPtr(SendMsg->IO.IOBuf)[0] == WHICH_STATE) - -#define SMTP_DBG_SEND() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: > %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) -#define SMTP_DBG_READ() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: < %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) - -void get_one_mx_host_name_done(void *Ctx, - int status, - int timeouts, - struct hostent *hostent) +void smtp_try(OneQueItem *MyQItem, + MailQEntry *MyQEntry, + StrBuf *MsgText, + int KeepMsgText, /* KeepMsgText allows us to use MsgText as ours. */ + int MsgCount) { - AsyncIO *IO = Ctx; - SmtpOutMsg *SendMsg = IO->Data; - if ((status == ARES_SUCCESS) && (hostent != NULL) ) { - CtdlLogPrintf(CTDL_DEBUG, - "SMTP client[%ld]: connecting to %s : %d ...\n", - SendMsg->n, - SendMsg->mx_host, - SendMsg->IO.dport); + SmtpOutMsg * SendMsg; + SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg)); + memset(SendMsg, 0, sizeof(SmtpOutMsg)); + SendMsg->IO.sock = (-1); + SendMsg->n = MsgCount++; + SendMsg->MyQEntry = MyQEntry; + SendMsg->MyQItem = MyQItem; + SendMsg->IO.Data = SendMsg; + if (KeepMsgText) + SendMsg->msgtext = MsgText; + else + SendMsg->msgtext = NewStrBufDup(MsgText); - SendMsg->IO.HEnt = hostent; - InitEventIO(IO, SendMsg, - SMTP_C_DispatchReadDone, - SMTP_C_DispatchWriteDone, - SMTP_C_Terminate, - SMTP_C_Timeout, - SMTP_C_ConnFail, - SMTP_C_ReadServerStatus, - 1); + smtp_resolve_recipients(SendMsg); - } + QueueEventContext(SendMsg, + &SendMsg->IO, + resolve_mx_records); } -const unsigned short DefaultMXPort = 25; -void connect_one_smtpsrv(SmtpOutMsg *SendMsg) -{ - //char *endpart; - //char buf[SIZ]; - SendMsg->IO.dport = DefaultMXPort; -/* TODO: Relay! - *SendMsg->mx_user = '\0'; - *SendMsg->mx_pass = '\0'; - if (num_tokens(buf, '@') > 1) { - strcpy (SendMsg->mx_user, buf); - endpart = strrchr(SendMsg->mx_user, '@'); - *endpart = '\0'; - strcpy (SendMsg->mx_host, endpart + 1); - endpart = strrchr(SendMsg->mx_user, ':'); - if (endpart != NULL) { - strcpy(SendMsg->mx_pass, endpart+1); - *endpart = '\0'; - } - - endpart = strrchr(SendMsg->mx_host, ':'); - if (endpart != 0){ - *endpart = '\0'; - strcpy(SendMsg->mx_port, endpart + 1); - } - } - else -*/ - SendMsg->mx_host = SendMsg->CurrMX->host; - SendMsg->CurrMX = SendMsg->CurrMX->next; - - CtdlLogPrintf(CTDL_DEBUG, - "SMTP client[%ld]: looking up %s : %d ...\n", - SendMsg->n, - SendMsg->mx_host); - - ares_gethostbyname(SendMsg->IO.DNSChannel, - SendMsg->mx_host, - AF_INET6, /* it falls back to ipv4 in doubt... */ - get_one_mx_host_name_done, - &SendMsg->IO); -} - +/*****************************************************************************/ +/* SMTP CLIENT STATE CALLBACKS */ +/*****************************************************************************/ eNextState SMTPC_read_greeting(SmtpOutMsg *SendMsg) { /* Process the SMTP greeting from the server */ @@ -903,372 +710,10 @@ eNextState SMTPC_send_dummy(SmtpOutMsg *SendMsg) return eReadMessage; } -eNextState smtp_resolve_mx_done(void *data) -{/// VParsedDNSReply - AsyncIO *IO = data; - SmtpOutMsg * SendMsg = IO->Data; - - SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.IOBuf = NewStrBuf(); - SendMsg->IO.ErrMsg = SendMsg->MyQEntry->StatusMessage; - - //// connect_one_smtpsrv_xamine_result - SendMsg->CurrMX = SendMsg->AllMX = IO->VParsedDNSReply; - //// TODO: should we remove the current ares context??? - connect_one_smtpsrv(SendMsg); - return 0; -} - - - -int resolve_mx_records(void *Ctx) -{ - SmtpOutMsg * SendMsg = Ctx; - - if (!QueueQuery(ns_t_mx, - SendMsg->node, - &SendMsg->IO, - smtp_resolve_mx_done)) - { - SendMsg->MyQEntry->Status = 5; - StrBufPrintf(SendMsg->MyQEntry->StatusMessage, - "No MX hosts found for <%s>", SendMsg->node); - return 0; ///////TODO: abort! - } - return 0; -} - -void smtp_try(OneQueItem *MyQItem, - MailQEntry *MyQEntry, - StrBuf *MsgText, - int KeepMsgText) /* KeepMsgText allows us to use MsgText as ours. */ -{ - SmtpOutMsg * SendMsg; - - SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg)); - memset(SendMsg, 0, sizeof(SmtpOutMsg)); - SendMsg->IO.sock = (-1); - SendMsg->n = MsgCount++; - SendMsg->MyQEntry = MyQEntry; - SendMsg->MyQItem = MyQItem; - SendMsg->IO.Data = SendMsg; - if (KeepMsgText) - SendMsg->msgtext = MsgText; - else - SendMsg->msgtext = NewStrBufDup(MsgText); - - smtp_resolve_recipients(SendMsg); - - QueueEventContext(SendMsg, - &SendMsg->IO, - resolve_mx_records); -} - - - -void NewMailQEntry(OneQueItem *Item) -{ - Item->Current = (MailQEntry*) malloc(sizeof(MailQEntry)); - memset(Item->Current, 0, sizeof(MailQEntry)); - - if (Item->MailQEntries == NULL) - Item->MailQEntries = NewHash(1, Flathash); - Item->Current->n = GetCount(Item->MailQEntries); - Put(Item->MailQEntries, IKEY(Item->Current->n), Item->Current, FreeMailQEntry); -} - -void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - Item->MessageID = StrBufExtractNext_int(Line, Pos, '|'); -} - -void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - if (Item->EnvelopeFrom == NULL) - Item->EnvelopeFrom = NewStrBufPlain(NULL, StrLength(Line)); - StrBufExtract_NextToken(Item->EnvelopeFrom, Line, Pos, '|'); -} - -void QItem_Handle_BounceTo(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - if (Item->BounceTo == NULL) - Item->BounceTo = NewStrBufPlain(NULL, StrLength(Line)); - StrBufExtract_NextToken(Item->BounceTo, Line, Pos, '|'); -} - -void QItem_Handle_Recipient(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - if (Item->Current == NULL) - NewMailQEntry(Item); - if (Item->Current->Recipient == NULL) - Item->Current->Recipient = NewStrBufPlain(NULL, StrLength(Line)); - StrBufExtract_NextToken(Item->Current->Recipient, Line, Pos, '|'); - Item->Current->Status = StrBufExtractNext_int(Line, Pos, '|'); - StrBufExtract_NextToken(Item->Current->StatusMessage, Line, Pos, '|'); - Item->Current = NULL; // TODO: is this always right? -} - - -void QItem_Handle_retry(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - if (Item->Current == NULL) - NewMailQEntry(Item); - if (Item->Current->Attempts[Item->Current->nAttempts].retry != 0) - Item->Current->nAttempts++; - if (Item->Current->nAttempts > MaxAttempts) { - Item->FailNow = 1; - return; - } - Item->Current->Attempts[Item->Current->nAttempts].retry = StrBufExtractNext_int(Line, Pos, '|'); -} - -void QItem_Handle_Attempted(OneQueItem *Item, StrBuf *Line, const char **Pos) -{ - if (Item->Current == NULL) - NewMailQEntry(Item); - if (Item->Current->Attempts[Item->Current->nAttempts].when != 0) - Item->Current->nAttempts++; - if (Item->Current->nAttempts > MaxAttempts) { - Item->FailNow = 1; - return; - } - - Item->Current->Attempts[Item->Current->nAttempts].when = StrBufExtractNext_int(Line, Pos, '|'); - if (Item->Current->Attempts[Item->Current->nAttempts].when > Item->LastAttempt.when) - { - Item->LastAttempt.when = Item->Current->Attempts[Item->Current->nAttempts].when; - Item->LastAttempt.retry = Item->Current->Attempts[Item->Current->nAttempts].retry * 2; - if (Item->LastAttempt.retry > SMTP_RETRY_MAX) - Item->LastAttempt.retry = SMTP_RETRY_MAX; - } -} - - - - -/* - * smtp_do_procmsg() - * - * Called by smtp_do_queue() to handle an individual message. - */ -void smtp_do_procmsg(long msgnum, void *userdata) { - struct CtdlMessage *msg = NULL; - char *instr = NULL; - StrBuf *PlainQItem; - OneQueItem *MyQItem; - char *pch; - HashPos *It; - void *vQE; - long len; - const char *Key; - - CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); - ///strcpy(envelope_from, ""); - - msg = CtdlFetchMessage(msgnum, 1); - if (msg == NULL) { - CtdlLogPrintf(CTDL_ERR, "SMTP Queue: tried %ld but no such message!\n", msgnum); - return; - } - - pch = instr = msg->cm_fields['M']; - - /* Strip out the headers (no not amd any other non-instruction) line */ - while (pch != NULL) { - pch = strchr(pch, '\n'); - if ((pch != NULL) && (*(pch + 1) == '\n')) { - instr = pch + 2; - pch = NULL; - } - } - PlainQItem = NewStrBufPlain(instr, -1); - CtdlFreeMessage(msg); - MyQItem = DeserializeQueueItem(PlainQItem, msgnum); - FreeStrBuf(&PlainQItem); - - if (MyQItem == NULL) { - CtdlLogPrintf(CTDL_ERR, "SMTP Queue: Msg No %ld: already in progress!\n", msgnum); - return; /* s.b. else is already processing... */ - } - - /* - * Postpone delivery if we've already tried recently. - * / - if (((time(NULL) - MyQItem->LastAttempt.when) < MyQItem->LastAttempt.retry) && (run_queue_now == 0)) { - CtdlLogPrintf(CTDL_DEBUG, "SMTP client: Retry time not yet reached.\n"); - - It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); - { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); - DeleteEntryFromHash(ActiveQItems, It); - } - citthread_mutex_unlock(&ActiveQItemsLock); - ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? - DeleteHashPos(&It); - return; - }// TODO: reenable me.*/ - - /* - * Bail out if there's no actual message associated with this - */ - if (MyQItem->MessageID < 0L) { - CtdlLogPrintf(CTDL_ERR, "SMTP Queue: no 'msgid' directive found!\n"); - It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); - { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); - DeleteEntryFromHash(ActiveQItems, It); - } - citthread_mutex_unlock(&ActiveQItemsLock); - DeleteHashPos(&It); - ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? - return; - } - - It = GetNewHashPos(MyQItem->MailQEntries, 0); - while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) - { - MailQEntry *ThisItem = vQE; - CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Task: <%s> %d\n", ChrPtr(ThisItem->Recipient), ThisItem->Active); - } - DeleteHashPos(&It); - - CountActiveQueueEntries(MyQItem); - if (MyQItem->ActiveDeliveries > 0) - { - int i = 1; - StrBuf *Msg = smtp_load_msg(MyQItem); - It = GetNewHashPos(MyQItem->MailQEntries, 0); - while ((i <= MyQItem->ActiveDeliveries) && - (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))) - { - MailQEntry *ThisItem = vQE; - if (ThisItem->Active == 1) { - CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient)); - smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries)); - i++; - } - } - DeleteHashPos(&It); - } - else - { - It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); - { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); - DeleteEntryFromHash(ActiveQItems, It); - } - citthread_mutex_unlock(&ActiveQItemsLock); - DeleteHashPos(&It); - ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? - -// TODO: bounce & delete? - - } -} - /*****************************************************************************/ -/* SMTP UTILITY COMMANDS */ +/* SMTP CLIENT DISPATCHER */ /*****************************************************************************/ - -void cmd_smtp(char *argbuf) { - char cmd[64]; - char node[256]; - char buf[1024]; - int i; - int num_mxhosts; - - if (CtdlAccessCheck(ac_aide)) return; - - extract_token(cmd, argbuf, 0, '|', sizeof cmd); - - if (!strcasecmp(cmd, "mx")) { - extract_token(node, argbuf, 1, '|', sizeof node); - num_mxhosts = getmx(buf, node); - cprintf("%d %d MX hosts listed for %s\n", - LISTING_FOLLOWS, num_mxhosts, node); - for (i=0; iroom, SMTP_SPOOLOUT_ROOM) != 0) { - CtdlLogPrintf(CTDL_ERR, "Cannot find room <%s>\n", SMTP_SPOOLOUT_ROOM); - } - else { - num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL); - } - CtdlLogPrintf(CTDL_INFO, "SMTP client: queue run completed; %d messages processed\n", num_processed); - CtdlThreadSleep(60); - } - - CtdlClearSystemContext(); - return(NULL); -} - - -/* - * Initialize the SMTP outbound queue - */ -void smtp_init_spoolout(void) { - struct ctdlroom qrbuf; - - /* - * Create the room. This will silently fail if the room already - * exists, and that's perfectly ok, because we want it to exist. - */ - CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_MAILBOX); - - /* - * Make sure it's set to be a "system room" so it doesn't show up - * in the nown rooms list for Aides. - */ - if (CtdlGetRoomLock(&qrbuf, SMTP_SPOOLOUT_ROOM) == 0) { - qrbuf.QRflags2 |= QR2_SYSTEM; - CtdlPutRoomLock(&qrbuf); - } -} - - SMTPReadHandler ReadHandlers[eMaxSMTPC] = { SMTPC_read_greeting, SMTPC_read_EHLO_reply, @@ -1281,7 +726,6 @@ SMTPReadHandler ReadHandlers[eMaxSMTPC] = { SMTPC_read_data_body_reply, SMTPC_read_QUIT_reply }; - SMTPSendHandler SendHandlers[eMaxSMTPC] = { SMTPC_send_dummy, /* we don't send a greeting, the server does... */ SMTPC_send_EHLO, @@ -1294,21 +738,35 @@ SMTPSendHandler SendHandlers[eMaxSMTPC] = { SMTPC_send_terminate_data_body, SMTPC_send_QUIT }; +eNextState SMTP_C_DispatchReadDone(void *Data) +{ + SmtpOutMsg *pMsg = Data; + eNextState rc = ReadHandlers[pMsg->State](pMsg); + pMsg->State++; + return rc; +} +eNextState SMTP_C_DispatchWriteDone(void *Data) +{ + SmtpOutMsg *pMsg = Data; + return SendHandlers[pMsg->State](pMsg); +} + +/*****************************************************************************/ +/* SMTP CLIENT ERROR CATCHERS */ +/*****************************************************************************/ eNextState SMTP_C_Terminate(void *Data) { SmtpOutMsg *pMsg = Data; FinalizeMessageSend(pMsg); return 0; } - eNextState SMTP_C_Timeout(void *Data) { SmtpOutMsg *pMsg = Data; FinalizeMessageSend(pMsg); return 0; } - eNextState SMTP_C_ConnFail(void *Data) { SmtpOutMsg *pMsg = Data; @@ -1316,56 +774,41 @@ eNextState SMTP_C_ConnFail(void *Data) return 0; } -eNextState SMTP_C_DispatchReadDone(void *Data) -{ - SmtpOutMsg *pMsg = Data; - eNextState rc = ReadHandlers[pMsg->State](pMsg); - pMsg->State++; - return rc; -} -eNextState SMTP_C_DispatchWriteDone(void *Data) +/** + * @brief lineread Handler; understands when to read more SMTP lines, and when this is a one-lined reply. + */ +eReadState SMTP_C_ReadServerStatus(AsyncIO *IO) { - SmtpOutMsg *pMsg = Data; - return SendHandlers[pMsg->State](pMsg); - -} + eReadState Finished = eBufferNotEmpty; -void smtp_evc_cleanup(void) -{ - DeleteHash(&QItemHandlers); - DeleteHash(&ActiveQItems); + while (Finished == eBufferNotEmpty) { + Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + return Finished; + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + if (StrLength(IO->IOBuf) < 4) + continue; + if (ChrPtr(IO->IOBuf)[3] == '-') + Finished = eBufferNotEmpty; + else + return Finished; + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + } + return Finished; } + #endif CTDL_MODULE_INIT(smtp_eventclient) { -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT - if (!threading) - { - ActiveQItems = NewHash(1, Flathash); - citthread_mutex_init(&ActiveQItemsLock, NULL); - - QItemHandlers = NewHash(0, NULL); - - Put(QItemHandlers, HKEY("msgid"), QItem_Handle_MsgID, reference_free_handler); - Put(QItemHandlers, HKEY("envelope_from"), QItem_Handle_EnvelopeFrom, reference_free_handler); - Put(QItemHandlers, HKEY("retry"), QItem_Handle_retry, reference_free_handler); - Put(QItemHandlers, HKEY("attempted"), QItem_Handle_Attempted, reference_free_handler); - Put(QItemHandlers, HKEY("remote"), QItem_Handle_Recipient, reference_free_handler); - Put(QItemHandlers, HKEY("bounceto"), QItem_Handle_BounceTo, reference_free_handler); -///submitted /TODO: flush qitemhandlers on exit - - - smtp_init_spoolout(); - - CtdlRegisterCleanupHook(smtp_evc_cleanup); - CtdlThreadCreate("SMTPEvent Send", CTDLTHREAD_BIGSTACK, smtp_queue_thread, NULL); - - CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands"); - } -#endif - - /* return our Subversion id for the Log */ return "smtpeventclient"; } diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c new file mode 100644 index 000000000..a05d2f26b --- /dev/null +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -0,0 +1,670 @@ +/* + * This module is an SMTP and ESMTP implementation for the Citadel system. + * It is compliant with all of the following: + * + * RFC 821 - Simple Mail Transfer Protocol + * RFC 876 - Survey of SMTP Implementations + * RFC 1047 - Duplicate messages and SMTP + * RFC 1652 - 8 bit MIME + * RFC 1869 - Extended Simple Mail Transfer Protocol + * RFC 1870 - SMTP Service Extension for Message Size Declaration + * RFC 2033 - Local Mail Transfer Protocol + * RFC 2197 - SMTP Service Extension for Command Pipelining + * RFC 2476 - Message Submission + * RFC 2487 - SMTP Service Extension for Secure SMTP over TLS + * RFC 2554 - SMTP Service Extension for Authentication + * RFC 2821 - Simple Mail Transfer Protocol + * RFC 2822 - Internet Message Format + * RFC 2920 - SMTP Service Extension for Command Pipelining + * + * The VRFY and EXPN commands have been removed from this implementation + * because nobody uses these commands anymore, except for spammers. + * + * Copyright (c) 1998-2009 by the citadel.org team + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "sysdep.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include "citadel.h" +#include "server.h" +#include "citserver.h" +#include "support.h" +#include "config.h" +#include "control.h" +#include "user_ops.h" +#include "database.h" +#include "msgbase.h" +#include "internet_addressing.h" +#include "genstamp.h" +#include "domain.h" +#include "clientsocket.h" +#include "locate_host.h" +#include "citadel_dirs.h" + +#include "ctdl_module.h" + +#include "smtp_util.h" +#include "smtpqueue.h" +#include "event_client.h" + +HashList *QItemHandlers = NULL; + +citthread_mutex_t ActiveQItemsLock; +HashList *ActiveQItems = NULL; + +int MsgCount = 0; +int run_queue_now = 0; /* Set to 1 to ignore SMTP send retry times */ + +void smtp_try(OneQueItem *MyQItem, + MailQEntry *MyQEntry, + StrBuf *MsgText, + int KeepMsgText, /* KeepMsgText allows us to use MsgText as ours. */ + int MsgCount); + + +void smtp_evq_cleanup(void) +{ + citthread_mutex_lock(&ActiveQItemsLock); + DeleteHash(&QItemHandlers); + DeleteHash(&ActiveQItems); + citthread_mutex_unlock(&ActiveQItemsLock); + citthread_mutex_destroy(&ActiveQItemsLock); +} + +int DecreaseQReference(OneQueItem *MyQItem) +{ + int IDestructQueItem; + + citthread_mutex_lock(&ActiveQItemsLock); + MyQItem->ActiveDeliveries--; + IDestructQueItem = MyQItem->ActiveDeliveries == 0; + citthread_mutex_unlock(&ActiveQItemsLock); + return IDestructQueItem; +} + +void RemoveQItem(OneQueItem *MyQItem) +{ + HashPos *It; + + It = GetNewHashPos(MyQItem->MailQEntries, 0); + citthread_mutex_lock(&ActiveQItemsLock); + { + GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + DeleteEntryFromHash(ActiveQItems, It); + } + citthread_mutex_unlock(&ActiveQItemsLock); + DeleteHashPos(&It); +} + + + +void FreeMailQEntry(void *qv) +{ + MailQEntry *Q = qv; + FreeStrBuf(&Q->Recipient); + FreeStrBuf(&Q->StatusMessage); + free(Q); +} +void FreeQueItem(OneQueItem **Item) +{ + DeleteHash(&(*Item)->MailQEntries); + FreeStrBuf(&(*Item)->EnvelopeFrom); + FreeStrBuf(&(*Item)->BounceTo); + free(*Item); + Item = NULL; +} +void HFreeQueItem(void *Item) +{ + FreeQueItem((OneQueItem**)&Item); +} + +/* inspect recipients with a status of: + * - 0 (no delivery yet attempted) + * - 3/4 (transient errors + * were experienced and it's time to try again) + */ +int CountActiveQueueEntries(OneQueItem *MyQItem) +{ + HashPos *It; + long len; + const char *Key; + void *vQE; + + MyQItem->ActiveDeliveries = 0; + It = GetNewHashPos(MyQItem->MailQEntries, 0); + while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) + { + MailQEntry *ThisItem = vQE; + if ((ThisItem->Status == 0) || + (ThisItem->Status == 3) || + (ThisItem->Status == 4)) + { + MyQItem->ActiveDeliveries++; + ThisItem->Active = 1; + } + else + ThisItem->Active = 0; + } + DeleteHashPos(&It); + return MyQItem->ActiveDeliveries; +} + +OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) +{ + OneQueItem *Item; + const char *pLine = NULL; + StrBuf *Line; + StrBuf *Token; + void *v; + + Item = (OneQueItem*)malloc(sizeof(OneQueItem)); + memset(Item, 0, sizeof(OneQueItem)); + Item->LastAttempt.retry = SMTP_RETRY_INTERVAL; + Item->MessageID = -1; + Item->QueMsgID = QueMsgID; + + citthread_mutex_lock(&ActiveQItemsLock); + if (GetHash(ActiveQItems, + IKEY(Item->QueMsgID), + &v)) + { + /* WHOOPS. somebody else is already working on this. */ + citthread_mutex_unlock(&ActiveQItemsLock); + FreeQueItem(&Item); + return NULL; + } + else { + /* mark our claim on this. */ + Put(ActiveQItems, + IKEY(Item->QueMsgID), + Item, + HFreeQueItem); + citthread_mutex_unlock(&ActiveQItemsLock); + } + + Token = NewStrBuf(); + Line = NewStrBufPlain(NULL, 128); + while (pLine != StrBufNOTNULL) { + const char *pItemPart = NULL; + void *vHandler; + + StrBufExtract_NextToken(Line, RawQItem, &pLine, '\n'); + if (StrLength(Line) == 0) continue; + StrBufExtract_NextToken(Token, Line, &pItemPart, '|'); + if (GetHash(QItemHandlers, SKEY(Token), &vHandler)) + { + QItemHandler H; + H = (QItemHandler) vHandler; + H(Item, Line, &pItemPart); + } + } + FreeStrBuf(&Line); + FreeStrBuf(&Token); + return Item; +} + +StrBuf *SerializeQueueItem(OneQueItem *MyQItem) +{ + StrBuf *QMessage; + HashPos *It; + const char *Key; + long len; + void *vQE; + + QMessage = NewStrBufPlain(NULL, SIZ); + StrBufPrintf(QMessage, "Content-type: %s\n", SPOOLMIME); + +// "attempted|%ld\n" "retry|%ld\n",, (long)time(NULL), (long)retry ); + StrBufAppendBufPlain(QMessage, HKEY("\nmsgid|"), 0); + StrBufAppendPrintf(QMessage, "%ld", MyQItem->MessageID); + + if (StrLength(MyQItem->BounceTo) > 0) { + StrBufAppendBufPlain(QMessage, HKEY("\nbounceto|"), 0); + StrBufAppendBuf(QMessage, MyQItem->BounceTo, 0); + } + + if (StrLength(MyQItem->EnvelopeFrom) > 0) { + StrBufAppendBufPlain(QMessage, HKEY("\nenvelope_from|"), 0); + StrBufAppendBuf(QMessage, MyQItem->EnvelopeFrom, 0); + } + + It = GetNewHashPos(MyQItem->MailQEntries, 0); + while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) + { + MailQEntry *ThisItem = vQE; + int i; + + if (!ThisItem->Active) + continue; /* skip already sent ones from the spoolfile. */ + + for (i=0; i < ThisItem->nAttempts; i++) { + StrBufAppendBufPlain(QMessage, HKEY("\nretry|"), 0); + StrBufAppendPrintf(QMessage, "%ld", + ThisItem->Attempts[i].retry); + + StrBufAppendBufPlain(QMessage, HKEY("\nattempted|"), 0); + StrBufAppendPrintf(QMessage, "%ld", + ThisItem->Attempts[i].when); + } + StrBufAppendBufPlain(QMessage, HKEY("\nremote|"), 0); + StrBufAppendBuf(QMessage, ThisItem->Recipient, 0); + StrBufAppendBufPlain(QMessage, HKEY("|"), 0); + StrBufAppendPrintf(QMessage, "%d", ThisItem->Status); + StrBufAppendBufPlain(QMessage, HKEY("|"), 0); + StrBufAppendBuf(QMessage, ThisItem->StatusMessage, 0); + } + DeleteHashPos(&It); + StrBufAppendBufPlain(QMessage, HKEY("\n"), 0); + return QMessage; +} + + + + + +void NewMailQEntry(OneQueItem *Item) +{ + Item->Current = (MailQEntry*) malloc(sizeof(MailQEntry)); + memset(Item->Current, 0, sizeof(MailQEntry)); + + if (Item->MailQEntries == NULL) + Item->MailQEntries = NewHash(1, Flathash); + Item->Current->n = GetCount(Item->MailQEntries); + Put(Item->MailQEntries, IKEY(Item->Current->n), Item->Current, FreeMailQEntry); +} + +void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + Item->MessageID = StrBufExtractNext_int(Line, Pos, '|'); +} + +void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + if (Item->EnvelopeFrom == NULL) + Item->EnvelopeFrom = NewStrBufPlain(NULL, StrLength(Line)); + StrBufExtract_NextToken(Item->EnvelopeFrom, Line, Pos, '|'); +} + +void QItem_Handle_BounceTo(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + if (Item->BounceTo == NULL) + Item->BounceTo = NewStrBufPlain(NULL, StrLength(Line)); + StrBufExtract_NextToken(Item->BounceTo, Line, Pos, '|'); +} + +void QItem_Handle_Recipient(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + if (Item->Current == NULL) + NewMailQEntry(Item); + if (Item->Current->Recipient == NULL) + Item->Current->Recipient = NewStrBufPlain(NULL, StrLength(Line)); + StrBufExtract_NextToken(Item->Current->Recipient, Line, Pos, '|'); + Item->Current->Status = StrBufExtractNext_int(Line, Pos, '|'); + StrBufExtract_NextToken(Item->Current->StatusMessage, Line, Pos, '|'); + Item->Current = NULL; // TODO: is this always right? +} + + +void QItem_Handle_retry(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + if (Item->Current == NULL) + NewMailQEntry(Item); + if (Item->Current->Attempts[Item->Current->nAttempts].retry != 0) + Item->Current->nAttempts++; + if (Item->Current->nAttempts > MaxAttempts) { + Item->FailNow = 1; + return; + } + Item->Current->Attempts[Item->Current->nAttempts].retry = StrBufExtractNext_int(Line, Pos, '|'); +} + +void QItem_Handle_Attempted(OneQueItem *Item, StrBuf *Line, const char **Pos) +{ + if (Item->Current == NULL) + NewMailQEntry(Item); + if (Item->Current->Attempts[Item->Current->nAttempts].when != 0) + Item->Current->nAttempts++; + if (Item->Current->nAttempts > MaxAttempts) { + Item->FailNow = 1; + return; + } + + Item->Current->Attempts[Item->Current->nAttempts].when = StrBufExtractNext_int(Line, Pos, '|'); + if (Item->Current->Attempts[Item->Current->nAttempts].when > Item->LastAttempt.when) + { + Item->LastAttempt.when = Item->Current->Attempts[Item->Current->nAttempts].when; + Item->LastAttempt.retry = Item->Current->Attempts[Item->Current->nAttempts].retry * 2; + if (Item->LastAttempt.retry > SMTP_RETRY_MAX) + Item->LastAttempt.retry = SMTP_RETRY_MAX; + } +} + + + +/** + * this one has to have the context for loading the message via the redirect buffer... + */ +StrBuf *smtp_load_msg(OneQueItem *MyQItem, int n) +{ + CitContext *CCC=CC; + StrBuf *SendMsg; + + CCC->redirect_buffer = NewStrBufPlain(NULL, SIZ); + CtdlOutputMsg(MyQItem->MessageID, MT_RFC822, HEADERS_ALL, 0, 1, NULL, (ESC_DOT|SUPPRESS_ENV_TO) ); + SendMsg = CCC->redirect_buffer; + CCC->redirect_buffer = NULL; + if ((StrLength(SendMsg) > 0) && + ChrPtr(SendMsg)[StrLength(SendMsg) - 1] != '\n') { + CtdlLogPrintf(CTDL_WARNING, + "SMTP client[%ld]: Possible problem: message did not " + "correctly terminate. (expecting 0x10, got 0x%02x)\n", + MsgCount, //yes uncool, but best choice here... + ChrPtr(SendMsg)[StrLength(SendMsg) - 1] ); + StrBufAppendBufPlain(SendMsg, HKEY("\r\n"), 0); + } + return SendMsg; +} + + + +/* + * smtp_do_procmsg() + * + * Called by smtp_do_queue() to handle an individual message. + */ +void smtp_do_procmsg(long msgnum, void *userdata) { + struct CtdlMessage *msg = NULL; + char *instr = NULL; + StrBuf *PlainQItem; + OneQueItem *MyQItem; + char *pch; + HashPos *It; + void *vQE; + long len; + const char *Key; + + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); + ///strcpy(envelope_from, ""); + + msg = CtdlFetchMessage(msgnum, 1); + if (msg == NULL) { + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: tried %ld but no such message!\n", msgnum); + return; + } + + pch = instr = msg->cm_fields['M']; + + /* Strip out the headers (no not amd any other non-instruction) line */ + while (pch != NULL) { + pch = strchr(pch, '\n'); + if ((pch != NULL) && (*(pch + 1) == '\n')) { + instr = pch + 2; + pch = NULL; + } + } + PlainQItem = NewStrBufPlain(instr, -1); + CtdlFreeMessage(msg); + MyQItem = DeserializeQueueItem(PlainQItem, msgnum); + FreeStrBuf(&PlainQItem); + + if (MyQItem == NULL) { + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: Msg No %ld: already in progress!\n", msgnum); + return; /* s.b. else is already processing... */ + } + + /* + * Postpone delivery if we've already tried recently. + * / + if (((time(NULL) - MyQItem->LastAttempt.when) < MyQItem->LastAttempt.retry) && (run_queue_now == 0)) { + CtdlLogPrintf(CTDL_DEBUG, "SMTP client: Retry time not yet reached.\n"); + + It = GetNewHashPos(MyQItem->MailQEntries, 0); + citthread_mutex_lock(&ActiveQItemsLock); + { + GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + DeleteEntryFromHash(ActiveQItems, It); + } + citthread_mutex_unlock(&ActiveQItemsLock); + ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? + DeleteHashPos(&It); + return; + }// TODO: reenable me.*/ + + /* + * Bail out if there's no actual message associated with this + */ + if (MyQItem->MessageID < 0L) { + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: no 'msgid' directive found!\n"); + It = GetNewHashPos(MyQItem->MailQEntries, 0); + citthread_mutex_lock(&ActiveQItemsLock); + { + GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + DeleteEntryFromHash(ActiveQItems, It); + } + citthread_mutex_unlock(&ActiveQItemsLock); + DeleteHashPos(&It); + ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? + return; + } + + It = GetNewHashPos(MyQItem->MailQEntries, 0); + while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) + { + MailQEntry *ThisItem = vQE; + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Task: <%s> %d\n", ChrPtr(ThisItem->Recipient), ThisItem->Active); + } + DeleteHashPos(&It); + + CountActiveQueueEntries(MyQItem); + if (MyQItem->ActiveDeliveries > 0) + { + int n = MsgCount++; + int i = 1; + StrBuf *Msg = smtp_load_msg(MyQItem, n); + It = GetNewHashPos(MyQItem->MailQEntries, 0); + while ((i <= MyQItem->ActiveDeliveries) && + (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))) + { + MailQEntry *ThisItem = vQE; + if (ThisItem->Active == 1) { + if (i > 1) n = MsgCount++; + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient)); + smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries), n); + i++; + } + } + DeleteHashPos(&It); + } + else + { + It = GetNewHashPos(MyQItem->MailQEntries, 0); + citthread_mutex_lock(&ActiveQItemsLock); + { + GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + DeleteEntryFromHash(ActiveQItems, It); + } + citthread_mutex_unlock(&ActiveQItemsLock); + DeleteHashPos(&It); + ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? + +// TODO: bounce & delete? + + } +} + + + +/* + * smtp_queue_thread() + * + * Run through the queue sending out messages. + */ +void *smtp_queue_thread(void *arg) { + int num_processed = 0; + struct CitContext smtp_queue_CC; + + CtdlThreadSleep(10); + + CtdlFillSystemContext(&smtp_queue_CC, "SMTP Send"); + citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + CtdlLogPrintf(CTDL_DEBUG, "smtp_queue_thread() initializing\n"); + + while (!CtdlThreadCheckStop()) { + + CtdlLogPrintf(CTDL_INFO, "SMTP client: processing outbound queue\n"); + + if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) { + CtdlLogPrintf(CTDL_ERR, "Cannot find room <%s>\n", SMTP_SPOOLOUT_ROOM); + } + else { + num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL); + } + CtdlLogPrintf(CTDL_INFO, "SMTP client: queue run completed; %d messages processed\n", num_processed); + CtdlThreadSleep(60); + } + + CtdlClearSystemContext(); + return(NULL); +} + + + +/* + * Initialize the SMTP outbound queue + */ +void smtp_init_spoolout(void) { + struct ctdlroom qrbuf; + + /* + * Create the room. This will silently fail if the room already + * exists, and that's perfectly ok, because we want it to exist. + */ + CtdlCreateRoom(SMTP_SPOOLOUT_ROOM, 3, "", 0, 1, 0, VIEW_MAILBOX); + + /* + * Make sure it's set to be a "system room" so it doesn't show up + * in the nown rooms list for Aides. + */ + if (CtdlGetRoomLock(&qrbuf, SMTP_SPOOLOUT_ROOM) == 0) { + qrbuf.QRflags2 |= QR2_SYSTEM; + CtdlPutRoomLock(&qrbuf); + } +} + + + + +/*****************************************************************************/ +/* SMTP UTILITY COMMANDS */ +/*****************************************************************************/ + +void cmd_smtp(char *argbuf) { + char cmd[64]; + char node[256]; + char buf[1024]; + int i; + int num_mxhosts; + + if (CtdlAccessCheck(ac_aide)) return; + + extract_token(cmd, argbuf, 0, '|', sizeof cmd); + + if (!strcasecmp(cmd, "mx")) { + extract_token(node, argbuf, 1, '|', sizeof node); + num_mxhosts = getmx(buf, node); + cprintf("%d %d MX hosts listed for %s\n", + LISTING_FOLLOWS, num_mxhosts, node); + for (i=0; iredirect_buffer = NewStrBufPlain(NULL, SIZ); - CtdlOutputMsg(omsgid, MT_RFC822, HEADERS_ALL, 0, 1, NULL, 0); - StrBufAppendBuf(BounceMB, CC->redirect_buffer, 0); - FreeStrBuf(&CC->redirect_buffer); + if (OMsgTxt == NULL) { + CC->redirect_buffer = NewStrBufPlain(NULL, SIZ); + CtdlOutputMsg(omsgid, MT_RFC822, HEADERS_ALL, 0, 1, NULL, 0); + StrBufAppendBuf(BounceMB, CC->redirect_buffer, 0); + FreeStrBuf(&CC->redirect_buffer); + } + else { + StrBufAppendBuf(BounceMB, OMsgTxt, 0); + } } /* Close the multipart MIME scope */ diff --git a/citadel/modules/smtp/smtp_util.h b/citadel/modules/smtp/smtp_util.h index 46c783ec8..7e97f6e96 100644 --- a/citadel/modules/smtp/smtp_util.h +++ b/citadel/modules/smtp/smtp_util.h @@ -54,4 +54,4 @@ typedef struct _citsmtp { /* Information about the current session */ #define SMTP ((citsmtp *)CC->session_specific_data) -void smtp_do_bounce(char *instr); +void smtp_do_bounce(char *instr, StrBuf *OMsgTxt); diff --git a/citadel/modules/smtp/smtpqueue.h b/citadel/modules/smtp/smtpqueue.h new file mode 100644 index 000000000..a7b97cd90 --- /dev/null +++ b/citadel/modules/smtp/smtpqueue.h @@ -0,0 +1,37 @@ +/*****************************************************************************/ +/* SMTP CLIENT (Queue Management) STUFF */ +/*****************************************************************************/ + +#define MaxAttempts 15 +typedef struct _delivery_attempt { + time_t when; + time_t retry; +}DeliveryAttempt; + +typedef struct _mailq_entry { + DeliveryAttempt Attempts[MaxAttempts]; + int nAttempts; + StrBuf *Recipient; + StrBuf *StatusMessage; + int Status; + int n; + int Active; +}MailQEntry; + +typedef struct queueitem { + long MessageID; + long QueMsgID; + int FailNow; + HashList *MailQEntries; + MailQEntry *Current; /* copy of the currently parsed item in the MailQEntries list; if null add a new one. */ + DeliveryAttempt LastAttempt; + long ActiveDeliveries; + StrBuf *EnvelopeFrom; + StrBuf *BounceTo; +} OneQueItem; +typedef void (*QItemHandler)(OneQueItem *Item, StrBuf *Line, const char **Pos); + +int DecreaseQReference(OneQueItem *MyQItem); +void RemoveQItem(OneQueItem *MyQItem); +int CountActiveQueueEntries(OneQueItem *MyQItem); +StrBuf *SerializeQueueItem(OneQueItem *MyQItem); -- 2.30.2