From c12b418a64b44be9d08cae0e5dd25c988a522b90 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Fri, 21 Jan 2011 00:20:07 +0100 Subject: [PATCH] libev migration: reinstantiate MX-Relay; unfinished. --- citadel/event_client.c | 43 +--- citadel/event_client.h | 15 +- .../modules/eventclient/serv_eventclient.c | 2 +- citadel/modules/smtp/serv_smtpeventclient.c | 199 ++++++++++++------ citadel/modules/smtp/serv_smtpqueue.c | 114 +++++++++- citadel/modules/smtp/smtpqueue.h | 16 ++ 6 files changed, 273 insertions(+), 116 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index f0078fed1..6316a1c4c 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -71,13 +71,13 @@ extern HashList *InboundEventQueue; extern struct ev_loop *event_base; -int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) +int QueueEventContext(AsyncIO *IO, IO_CallBack CB) { IOAddHandler *h; int i; h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); - h->Ctx = Ctx; + h->IO = IO; h->EvAttch = CB; citthread_mutex_lock(&EventQueueMutex); @@ -325,17 +325,15 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) { - struct sockaddr_in saddr; int fdflags; int rc = -1; IO->SendBuf.fd = IO->RecvBuf.fd = - IO->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); -/* -IO->curr_ai->ai_family, - IO->curr_ai->ai_socktype, - IO->curr_ai->ai_protocol); -*/ + IO->sock = socket( + (IO->IP6)?PF_INET6:PF_INET, + SOCK_STREAM, + IPPROTO_TCP); + if (IO->sock < 0) { CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); @@ -372,19 +370,10 @@ IO->curr_ai->ai_family, ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0); IO->rw_timeout.data = IO; - memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) ); - - memcpy(&saddr.sin_addr, - IO->HEnt->h_addr_list[0], - sizeof(struct in_addr)); -// saddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - - saddr.sin_family = AF_INET; - saddr.sin_port = htons(IO->dport); rc = connect(IO->sock, - (struct sockaddr *) &saddr, -/// TODO: ipv6?? (IO->HEnt->h_addrtype == AF_INET6)? - /* sizeof(in6_addr):*/ + (struct sockaddr *) &IO->Addr, + (IO->IP6)? ///HEnt->h_addrtype == AF_INET6)? + sizeof(struct in6_addr): sizeof(struct sockaddr_in)); if (rc >= 0){ //// freeaddrinfo(res); @@ -417,23 +406,11 @@ void SetNextTimeout(AsyncIO *IO, double timeout) void InitEventIO(AsyncIO *IO, void *pData, - IO_CallBack ReadDone, - IO_CallBack SendDone, - IO_CallBack Terminate, - IO_CallBack Timeout, - IO_CallBack ConnFail, - IO_LineReaderCallback LineReader, double conn_timeout, double first_rw_timeout, int ReadFirst) { IO->Data = pData; - IO->SendDone = SendDone; - IO->ReadDone = ReadDone; - IO->Terminate = Terminate; - IO->LineReader = LineReader; - IO->ConnFail = ConnFail; - IO->Timeout = Timeout; if (ReadFirst) { IO->NextState = eReadMessage; diff --git a/citadel/event_client.h b/citadel/event_client.h index 208a71661..7fe12d1a4 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -15,7 +15,6 @@ typedef enum _eNextState { eAbort }eNextState; -typedef int (*EventContextAttach)(void *Data); typedef eNextState (*IO_CallBack)(AsyncIO *IO); typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO); typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int); @@ -32,6 +31,8 @@ struct AsyncIO { /* connection related */ int IP6; struct hostent *HEnt; + struct sockaddr_in6 Addr; + int sock; unsigned short dport; eNextState NextState; @@ -77,23 +78,17 @@ struct AsyncIO { }; typedef struct _IOAddHandler { - void *Ctx; - EventContextAttach EvAttch; + AsyncIO *IO; + IO_CallBack EvAttch; }IOAddHandler; void FreeAsyncIOContents(AsyncIO *IO); -int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB); +int QueueEventContext(AsyncIO *IO, IO_CallBack CB); int ShutDownEventQueue(void); void InitEventIO(AsyncIO *IO, void *pData, - IO_CallBack ReadDone, - IO_CallBack SendDone, - IO_CallBack Terminate, - IO_CallBack Timeout, - IO_CallBack ConnFail, - IO_LineReaderCallback LineReader, double conn_timeout, double first_rw_timeout, int ReadFirst); diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index a858048da..ec6af084e 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -97,7 +97,7 @@ static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int reve while (GetNextHashPos(q, It, &len, &Key, &v)) { IOAddHandler *h = v; - h->EvAttch(h->Ctx); + h->EvAttch(h->IO); } DeleteHashPos(&It); DeleteHashContent(&q); diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index ff99668bd..919f97f60 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -133,10 +133,7 @@ const double SMTP_C_SendTimeouts[eMaxSMTPC] = { 900., /* end of body... */ 30. /* QUIT */ }; -/* -const long SMTP_C_SendTimeouts[eMaxSMTPC] = { -}; */ static const ConstStr ReadErrors[eMaxSMTPC] = { {HKEY("Connection broken during SMTP conversation")}, {HKEY("Connection broken during SMTP EHLO")}, @@ -165,8 +162,8 @@ typedef struct _stmp_out_msg { struct hostent *OneMX; - char mx_user[1024]; - char mx_pass[1024]; + ParsedURL *Relay; + ParsedURL *pCurrRelay; StrBuf *msgtext; char *envelope_from; char user[1024]; @@ -259,18 +256,50 @@ void FinalizeMessageSend(SmtpOutMsg *Msg) - -void get_one_mx_host_ip_done(void *Ctx, - int status, - int timeouts, - struct hostent *hostent) +eNextState mx_connect_relay_ip(AsyncIO *IO) { - AsyncIO *IO = Ctx; + SmtpOutMsg *SendMsg = IO->Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); - if ((status == ARES_SUCCESS) && (hostent != NULL) ) { + + IO->IP6 = SendMsg->pCurrRelay->af == AF_INET6; + + if (SendMsg->pCurrRelay->Port != 0) + IO->dport = SendMsg->pCurrRelay->Port; + + memset(&IO->Addr, 0, sizeof(struct in6_addr)); + if (IO->IP6) { + memcpy(&IO->Addr.sin6_addr.s6_addr, + &SendMsg->pCurrRelay->Addr, + sizeof(struct in6_addr)); + + IO->Addr.sin6_family = AF_INET6; + IO->Addr.sin6_port = htons(IO->dport); + } + else { + struct sockaddr_in *addr = (struct sockaddr_in*) &IO->Addr; + /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ + memcpy(&addr->sin_addr, + &SendMsg->pCurrRelay->Addr, + sizeof(struct in_addr)); + + addr->sin_family = AF_INET; + addr->sin_port = htons(IO->dport); + } + + + +/* + + SendMsg->pCurrRelay-> + if ((SendMsg->pCurrRelay != NULL) && (SendMsg->pCurrRelay->IsIP)) { + connect = 1; + + } unsigned long psaddr; - // TODO: IPV6 + + memcpy(&psaddr, hostent->h_addr_list[0], sizeof(psaddr)); psaddr = ntohl(psaddr); @@ -294,53 +323,67 @@ void get_one_mx_host_ip_done(void *Ctx, (psaddr >> 0) & 0xFF, SendMsg->IO.dport); + + } +*/ + /// TODO: else fail! + InitEventIO(IO, SendMsg, + SMTP_C_ConnTimeout, + SMTP_C_ReadTimeouts[0], + 1); +} + +void get_one_mx_host_ip_done(void *Ctx, + int status, + int timeouts, + struct hostent *hostent) +{ + AsyncIO *IO = (AsyncIO *) Ctx; + SmtpOutMsg *SendMsg = IO->Data; + + if ((status == ARES_SUCCESS) && (hostent != NULL) ) { + + IO->IP6 = hostent->h_addrtype == AF_INET6; + + memset(&IO->Addr, 0, sizeof(struct in6_addr)); + if (IO->IP6) { + memcpy(&IO->Addr.sin6_addr.s6_addr, + IO->HEnt->h_addr_list[0], + sizeof(struct in6_addr)); + + IO->Addr.sin6_family = hostent->h_addrtype; + IO->Addr.sin6_port = htons(IO->dport); + } + else { + struct sockaddr_in *addr = (struct sockaddr_in*) &IO->Addr; + /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ + memcpy(&addr->sin_addr, + IO->HEnt->h_addr_list[0], + sizeof(struct in_addr)); + + addr->sin_family = hostent->h_addrtype; + addr->sin_port = htons(IO->dport); + + } SendMsg->IO.HEnt = hostent; InitEventIO(IO, SendMsg, - SMTP_C_DispatchReadDone, - SMTP_C_DispatchWriteDone, - SMTP_C_Terminate, - SMTP_C_Timeout, - SMTP_C_ConnFail, - SMTP_C_ReadServerStatus, SMTP_C_ConnTimeout, SMTP_C_ReadTimeouts[0], 1); - } } const unsigned short DefaultMXPort = 25; -void get_one_mx_host_ip(SmtpOutMsg *SendMsg) +eNextState get_one_mx_host_ip(AsyncIO *IO) { + SmtpOutMsg * SendMsg = IO->Data; + //char *endpart; //char buf[SIZ]; CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); SendMsg->IO.dport = DefaultMXPort; - -/* TODO: Relay! - *SendMsg->mx_user = '\0'; - *SendMsg->mx_pass = '\0'; - if (num_tokens(buf, '@') > 1) { - strcpy (SendMsg->mx_user, buf); - endpart = strrchr(SendMsg->mx_user, '@'); - *endpart = '\0'; - strcpy (SendMsg->mx_host, endpart + 1); - endpart = strrchr(SendMsg->mx_user, ':'); - if (endpart != NULL) { - strcpy(SendMsg->mx_pass, endpart+1); - *endpart = '\0'; - } - - endpart = strrchr(SendMsg->mx_host, ':'); - if (endpart != 0){ - *endpart = '\0'; - strcpy(SendMsg->mx_port, endpart + 1); - } - } - else -*/ SendMsg->mx_host = SendMsg->CurrMX->host; SendMsg->CurrMX = SendMsg->CurrMX->next; @@ -371,14 +414,14 @@ eNextState smtp_resolve_mx_done(AsyncIO *IO) SendMsg->CurrMX = SendMsg->AllMX = IO->VParsedDNSReply; //// TODO: should we remove the current ares context??? - get_one_mx_host_ip(SendMsg); + get_one_mx_host_ip(IO); return 0; } -int resolve_mx_records(void *Ctx) +eNextState resolve_mx_records(AsyncIO *IO) { - SmtpOutMsg * SendMsg = Ctx; + SmtpOutMsg * SendMsg = IO->Data; CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); @@ -490,20 +533,41 @@ void smtp_try(OneQueItem *MyQItem, SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg)); memset(SendMsg, 0, sizeof(SmtpOutMsg)); - SendMsg->IO.sock = (-1); - SendMsg->n = MsgCount++; - SendMsg->MyQEntry = MyQEntry; - SendMsg->MyQItem = MyQItem; - SendMsg->IO.Data = SendMsg; - if (KeepMsgText) - SendMsg->msgtext = MsgText; - else + SendMsg->IO.sock = (-1); + SendMsg->n = MsgCount++; + SendMsg->MyQEntry = MyQEntry; + SendMsg->MyQItem = MyQItem; + SendMsg->pCurrRelay = MyQItem->URL; + + SendMsg->IO.Data = SendMsg; + SendMsg->IO.SendDone = SMTP_C_DispatchWriteDone; + SendMsg->IO.ReadDone = SMTP_C_DispatchReadDone; + SendMsg->IO.Terminate = SMTP_C_Terminate; + SendMsg->IO.LineReader = SMTP_C_ReadServerStatus; + SendMsg->IO.ConnFail = SMTP_C_ConnFail; + SendMsg->IO.Timeout = SMTP_C_Timeout; + + if (KeepMsgText) { + SendMsg->msgtext = MsgText; + } + else { SendMsg->msgtext = NewStrBufDup(MsgText); + } if (smtp_resolve_recipients(SendMsg)) { - QueueEventContext(SendMsg, - &SendMsg->IO, - resolve_mx_records); + if (SendMsg->pCurrRelay == NULL) + QueueEventContext(&SendMsg->IO, + resolve_mx_records); + else { + if (SendMsg->pCurrRelay->IsIP) { + QueueEventContext(&SendMsg->IO, + mx_connect_relay_ip); + } + else { + QueueEventContext(&SendMsg->IO, + get_one_mx_host_ip); + } + } } else { if ((SendMsg==NULL) || @@ -555,7 +619,8 @@ eNextState SMTPC_read_EHLO_reply(SmtpOutMsg *SendMsg) if (SMTP_IS_STATE('2')) { SendMsg->State ++; - if (IsEmptyStr(SendMsg->mx_user)) + + if (SendMsg->pCurrRelay->User == NULL) SendMsg->State ++; /* Skip auth... */ } /* else we fall back to 'helo' */ @@ -581,7 +646,7 @@ eNextState SMTPC_read_HELO_reply(SmtpOutMsg *SendMsg) else SMTP_VERROR(5); } - if (!IsEmptyStr(SendMsg->mx_user)) + if (SendMsg->pCurrRelay->User == NULL) SendMsg->State ++; /* Skip auth... */ return eSendReply; } @@ -593,16 +658,15 @@ eNextState SMTPC_send_auth(SmtpOutMsg *SendMsg) /* Do an AUTH command if necessary */ sprintf(buf, "%s%c%s%c%s", - SendMsg->mx_user, '\0', - SendMsg->mx_user, '\0', - SendMsg->mx_pass); + SendMsg->pCurrRelay->User, '\0', + SendMsg->pCurrRelay->User, '\0', + SendMsg->pCurrRelay->Pass); CtdlEncodeBase64(encoded, buf, - strlen(SendMsg->mx_user) + - strlen(SendMsg->mx_user) + - strlen(SendMsg->mx_pass) + 2, 0); + strlen(SendMsg->pCurrRelay->User) * 2 + + strlen(SendMsg->pCurrRelay->Pass) + 2, 0); StrBufPrintf(SendMsg->IO.SendBuf.Buf, "AUTH PLAIN %s\r\n", encoded); - + SMTP_DBG_SEND(); return eReadMessage; } @@ -909,7 +973,6 @@ eReadState SMTP_C_ReadServerStatus(AsyncIO *IO) return Finished; } - #endif CTDL_MODULE_INIT(smtp_eventclient) { diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c index eb4d3f17b..4115cb522 100644 --- a/citadel/modules/smtp/serv_smtpqueue.c +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -100,7 +100,8 @@ void smtp_try(OneQueItem *MyQItem, MailQEntry *MyQEntry, StrBuf *MsgText, int KeepMsgText, /* KeepMsgText allows us to use MsgText as ours. */ - int MsgCount); + int MsgCount, + ParsedURL *RelayUrls); void smtp_evq_cleanup(void) @@ -588,6 +589,80 @@ void smtpq_do_bounce(OneQueItem *MyQItem, StrBuf *OMsgTxt) CtdlLogPrintf(CTDL_DEBUG, "Done processing bounces\n"); } + + +int ParseURL(ParsedURL **Url, StrBuf *UrlStr, short DefaultPort) +{ + const char *pch, *pStartHost, *pEndHost, *pPort, *pCredEnd, *pUserEnd; + ParsedURL *url = (ParsedURL *)malloc(sizeof(ParsedURL)); + memset(url, 0, sizeof(ParsedURL)); + + url->af = AF_INET; + url->Port = DefaultPort; + /* + * http://username:passvoid@[ipv6]:port/url + */ + url->URL = NewStrBufDup(UrlStr); + pStartHost = pch = ChrPtr(url->URL); + url->LocalPart = strchr(pch, '/'); + if (url->LocalPart != NULL) { + if ((*(url->LocalPart + 1) == '/') && + (*(url->LocalPart + 2) == ':')) { /* TODO: find default port for this protocol... */ + pStartHost = url->LocalPart + 3; + url->LocalPart = strchr(pStartHost, '/'); + } + } + if (url->LocalPart == NULL) { + url->LocalPart = pch + StrLength(UrlStr); + } + + pCredEnd = strchr(pch, '@'); + if (pCredEnd >= url->LocalPart) + pCredEnd = NULL; + if (pCredEnd != NULL) + { + url->User = pStartHost; + pStartHost = pCredEnd + 1; + pUserEnd = strchr(url->User, ':'); + + if (pUserEnd > pCredEnd) + pUserEnd = pCredEnd; + else { + url->Pass = pUserEnd + 1; + } + StrBufPeek(UrlStr, pUserEnd, 0, '\0'); + StrBufPeek(UrlStr, pCredEnd, 0, '\0'); + } + + pPort = NULL; + if (*pStartHost == '[') { + pStartHost ++; + pEndHost = strchr(pStartHost, ']'); + if (pEndHost == NULL) { + free(url); + return 0; /* invalid syntax, no ipv6 */ + } + if (*(pEndHost + 1) == ':') + pPort = pEndHost + 2; + url->af = AF_INET6; + } + else { + pPort = strchr(pStartHost, ':'); + if (pPort != NULL) + pPort ++; + } + if (pPort != NULL) + url->Port = atol(pPort); + url->IsIP = inet_pton(url->af, pStartHost, &url->Addr); + return 1; +} +/* +{ + + if (threadding) + n_smarthosts = get_hosts(char *mxbuf, char *rectype); +} +*/ /* * smtp_do_procmsg() * @@ -603,7 +678,11 @@ void smtp_do_procmsg(long msgnum, void *userdata) { void *vQE; long len; const char *Key; - + int nRelays = 0; + ParsedURL *RelayUrls = NULL; + int HaveBuffers = 0; + StrBuf *Msg =NULL; + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); ///strcpy(envelope_from, ""); @@ -668,6 +747,27 @@ void smtp_do_procmsg(long msgnum, void *userdata) { return; } + { + char mxbuf[SIZ]; + ParsedURL **Url = &RelayUrls; ///&MyQItem->Relay; + nRelays = get_hosts(mxbuf, "smarthost"); + if (nRelays > 0) { + StrBuf *All; + StrBuf *One; + const char *Pos = NULL; + All = NewStrBufPlain(mxbuf, -1); + One = NewStrBufPlain(NULL, StrLength(All) + 1); + + while (Pos != StrBufNOTNULL) { + StrBufExtract_NextToken(One, All, &Pos, '|'); + if (!ParseURL(Url, One, 25)) + CtdlLogPrintf(CTDL_DEBUG, "Failed to parse: %s\n", ChrPtr(One)); + else + Url = &(*Url)->Next; + } + } + } + It = GetNewHashPos(MyQItem->MailQEntries, 0); while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) { @@ -681,16 +781,18 @@ void smtp_do_procmsg(long msgnum, void *userdata) { { int n = MsgCount++; int i = 1; - StrBuf *Msg = smtp_load_msg(MyQItem, n); + Msg = smtp_load_msg(MyQItem, n); It = GetNewHashPos(MyQItem->MailQEntries, 0); while ((i <= MyQItem->ActiveDeliveries) && (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))) { MailQEntry *ThisItem = vQE; if (ThisItem->Active == 1) { + int KeepBuffers = (i == MyQItem->ActiveDeliveries); if (i > 1) n = MsgCount++; CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient)); - smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries), n); + smtp_try(MyQItem, ThisItem, Msg, KeepBuffers, n, RelayUrls); + if (KeepBuffers) HaveBuffers = 1; i++; } } @@ -711,6 +813,10 @@ void smtp_do_procmsg(long msgnum, void *userdata) { // TODO: bounce & delete? } + if (!HaveBuffers) { + FreeStrBuf (&Msg); +// TODO : free RelayUrls + } } diff --git a/citadel/modules/smtp/smtpqueue.h b/citadel/modules/smtp/smtpqueue.h index 023b479ca..d05435189 100644 --- a/citadel/modules/smtp/smtpqueue.h +++ b/citadel/modules/smtp/smtpqueue.h @@ -2,6 +2,21 @@ /* SMTP CLIENT (Queue Management) STUFF */ /*****************************************************************************/ +typedef struct ParsedURL ParsedURL; +struct ParsedURL { + StrBuf *URL; + unsigned Port; + const char *User; + const char *Pass; + const char *LocalPart; + int IsIP; + int af; + struct hostent *HEnt; + struct in6_addr Addr; + ParsedURL *Next; +}; + + #define MaxAttempts 15 typedef struct _delivery_attempt { time_t when; @@ -38,6 +53,7 @@ typedef struct queueitem { long ActiveDeliveries; StrBuf *EnvelopeFrom; StrBuf *BounceTo; + ParsedURL *URL; } OneQueItem; typedef void (*QItemHandler)(OneQueItem *Item, StrBuf *Line, const char **Pos); -- 2.30.2