extern struct ev_loop *event_base;
-int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
+int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
{
IOAddHandler *h;
int i;
h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
- h->Ctx = Ctx;
+ h->IO = IO;
h->EvAttch = CB;
citthread_mutex_lock(&EventQueueMutex);
int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
{
- struct sockaddr_in saddr;
int fdflags;
int rc = -1;
IO->SendBuf.fd = IO->RecvBuf.fd =
- IO->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-/*
-IO->curr_ai->ai_family,
- IO->curr_ai->ai_socktype,
- IO->curr_ai->ai_protocol);
-*/
+ IO->sock = socket(
+ (IO->IP6)?PF_INET6:PF_INET,
+ SOCK_STREAM,
+ IPPROTO_TCP);
+
if (IO->sock < 0) {
CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno));
StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno));
ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0);
IO->rw_timeout.data = IO;
- memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) );
-
- memcpy(&saddr.sin_addr,
- IO->HEnt->h_addr_list[0],
- sizeof(struct in_addr));
-// saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
-
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons(IO->dport);
rc = connect(IO->sock,
- (struct sockaddr *) &saddr,
-/// TODO: ipv6?? (IO->HEnt->h_addrtype == AF_INET6)?
- /* sizeof(in6_addr):*/
+ (struct sockaddr *) &IO->Addr,
+ (IO->IP6)? ///HEnt->h_addrtype == AF_INET6)?
+ sizeof(struct in6_addr):
sizeof(struct sockaddr_in));
if (rc >= 0){
//// freeaddrinfo(res);
void InitEventIO(AsyncIO *IO,
void *pData,
- IO_CallBack ReadDone,
- IO_CallBack SendDone,
- IO_CallBack Terminate,
- IO_CallBack Timeout,
- IO_CallBack ConnFail,
- IO_LineReaderCallback LineReader,
double conn_timeout,
double first_rw_timeout,
int ReadFirst)
{
IO->Data = pData;
- IO->SendDone = SendDone;
- IO->ReadDone = ReadDone;
- IO->Terminate = Terminate;
- IO->LineReader = LineReader;
- IO->ConnFail = ConnFail;
- IO->Timeout = Timeout;
if (ReadFirst) {
IO->NextState = eReadMessage;
eAbort
}eNextState;
-typedef int (*EventContextAttach)(void *Data);
typedef eNextState (*IO_CallBack)(AsyncIO *IO);
typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO);
typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int);
/* connection related */
int IP6;
struct hostent *HEnt;
+ struct sockaddr_in6 Addr;
+
int sock;
unsigned short dport;
eNextState NextState;
};
typedef struct _IOAddHandler {
- void *Ctx;
- EventContextAttach EvAttch;
+ AsyncIO *IO;
+ IO_CallBack EvAttch;
}IOAddHandler;
void FreeAsyncIOContents(AsyncIO *IO);
-int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB);
+int QueueEventContext(AsyncIO *IO, IO_CallBack CB);
int ShutDownEventQueue(void);
void InitEventIO(AsyncIO *IO,
void *pData,
- IO_CallBack ReadDone,
- IO_CallBack SendDone,
- IO_CallBack Terminate,
- IO_CallBack Timeout,
- IO_CallBack ConnFail,
- IO_LineReaderCallback LineReader,
double conn_timeout, double first_rw_timeout,
int ReadFirst);
while (GetNextHashPos(q, It, &len, &Key, &v))
{
IOAddHandler *h = v;
- h->EvAttch(h->Ctx);
+ h->EvAttch(h->IO);
}
DeleteHashPos(&It);
DeleteHashContent(&q);
900., /* end of body... */
30. /* QUIT */
};
-/*
-const long SMTP_C_SendTimeouts[eMaxSMTPC] = {
-}; */
static const ConstStr ReadErrors[eMaxSMTPC] = {
{HKEY("Connection broken during SMTP conversation")},
{HKEY("Connection broken during SMTP EHLO")},
struct hostent *OneMX;
- char mx_user[1024];
- char mx_pass[1024];
+ ParsedURL *Relay;
+ ParsedURL *pCurrRelay;
StrBuf *msgtext;
char *envelope_from;
char user[1024];
-
-void get_one_mx_host_ip_done(void *Ctx,
- int status,
- int timeouts,
- struct hostent *hostent)
+eNextState mx_connect_relay_ip(AsyncIO *IO)
{
- AsyncIO *IO = Ctx;
+
SmtpOutMsg *SendMsg = IO->Data;
+
CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
- if ((status == ARES_SUCCESS) && (hostent != NULL) ) {
+
+ IO->IP6 = SendMsg->pCurrRelay->af == AF_INET6;
+
+ if (SendMsg->pCurrRelay->Port != 0)
+ IO->dport = SendMsg->pCurrRelay->Port;
+
+ memset(&IO->Addr, 0, sizeof(struct in6_addr));
+ if (IO->IP6) {
+ memcpy(&IO->Addr.sin6_addr.s6_addr,
+ &SendMsg->pCurrRelay->Addr,
+ sizeof(struct in6_addr));
+
+ IO->Addr.sin6_family = AF_INET6;
+ IO->Addr.sin6_port = htons(IO->dport);
+ }
+ else {
+ struct sockaddr_in *addr = (struct sockaddr_in*) &IO->Addr;
+ /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
+ memcpy(&addr->sin_addr,
+ &SendMsg->pCurrRelay->Addr,
+ sizeof(struct in_addr));
+
+ addr->sin_family = AF_INET;
+ addr->sin_port = htons(IO->dport);
+ }
+
+
+
+/*
+
+ SendMsg->pCurrRelay->
+ if ((SendMsg->pCurrRelay != NULL) && (SendMsg->pCurrRelay->IsIP)) {
+ connect = 1;
+
+ }
unsigned long psaddr;
- // TODO: IPV6
+
+
memcpy(&psaddr, hostent->h_addr_list[0], sizeof(psaddr));
psaddr = ntohl(psaddr);
(psaddr >> 0) & 0xFF,
SendMsg->IO.dport);
+
+ }
+*/
+ /// TODO: else fail!
+ InitEventIO(IO, SendMsg,
+ SMTP_C_ConnTimeout,
+ SMTP_C_ReadTimeouts[0],
+ 1);
+}
+
+void get_one_mx_host_ip_done(void *Ctx,
+ int status,
+ int timeouts,
+ struct hostent *hostent)
+{
+ AsyncIO *IO = (AsyncIO *) Ctx;
+ SmtpOutMsg *SendMsg = IO->Data;
+
+ if ((status == ARES_SUCCESS) && (hostent != NULL) ) {
+
+ IO->IP6 = hostent->h_addrtype == AF_INET6;
+
+ memset(&IO->Addr, 0, sizeof(struct in6_addr));
+ if (IO->IP6) {
+ memcpy(&IO->Addr.sin6_addr.s6_addr,
+ IO->HEnt->h_addr_list[0],
+ sizeof(struct in6_addr));
+
+ IO->Addr.sin6_family = hostent->h_addrtype;
+ IO->Addr.sin6_port = htons(IO->dport);
+ }
+ else {
+ struct sockaddr_in *addr = (struct sockaddr_in*) &IO->Addr;
+ /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
+ memcpy(&addr->sin_addr,
+ IO->HEnt->h_addr_list[0],
+ sizeof(struct in_addr));
+
+ addr->sin_family = hostent->h_addrtype;
+ addr->sin_port = htons(IO->dport);
+
+ }
SendMsg->IO.HEnt = hostent;
InitEventIO(IO, SendMsg,
- SMTP_C_DispatchReadDone,
- SMTP_C_DispatchWriteDone,
- SMTP_C_Terminate,
- SMTP_C_Timeout,
- SMTP_C_ConnFail,
- SMTP_C_ReadServerStatus,
SMTP_C_ConnTimeout,
SMTP_C_ReadTimeouts[0],
1);
-
}
}
const unsigned short DefaultMXPort = 25;
-void get_one_mx_host_ip(SmtpOutMsg *SendMsg)
+eNextState get_one_mx_host_ip(AsyncIO *IO)
{
+ SmtpOutMsg * SendMsg = IO->Data;
+
//char *endpart;
//char buf[SIZ];
CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
SendMsg->IO.dport = DefaultMXPort;
-
-/* TODO: Relay!
- *SendMsg->mx_user = '\0';
- *SendMsg->mx_pass = '\0';
- if (num_tokens(buf, '@') > 1) {
- strcpy (SendMsg->mx_user, buf);
- endpart = strrchr(SendMsg->mx_user, '@');
- *endpart = '\0';
- strcpy (SendMsg->mx_host, endpart + 1);
- endpart = strrchr(SendMsg->mx_user, ':');
- if (endpart != NULL) {
- strcpy(SendMsg->mx_pass, endpart+1);
- *endpart = '\0';
- }
-
- endpart = strrchr(SendMsg->mx_host, ':');
- if (endpart != 0){
- *endpart = '\0';
- strcpy(SendMsg->mx_port, endpart + 1);
- }
- }
- else
-*/
SendMsg->mx_host = SendMsg->CurrMX->host;
SendMsg->CurrMX = SendMsg->CurrMX->next;
SendMsg->CurrMX = SendMsg->AllMX = IO->VParsedDNSReply;
//// TODO: should we remove the current ares context???
- get_one_mx_host_ip(SendMsg);
+ get_one_mx_host_ip(IO);
return 0;
}
-int resolve_mx_records(void *Ctx)
+eNextState resolve_mx_records(AsyncIO *IO)
{
- SmtpOutMsg * SendMsg = Ctx;
+ SmtpOutMsg * SendMsg = IO->Data;
CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg));
memset(SendMsg, 0, sizeof(SmtpOutMsg));
- SendMsg->IO.sock = (-1);
- SendMsg->n = MsgCount++;
- SendMsg->MyQEntry = MyQEntry;
- SendMsg->MyQItem = MyQItem;
- SendMsg->IO.Data = SendMsg;
- if (KeepMsgText)
- SendMsg->msgtext = MsgText;
- else
+ SendMsg->IO.sock = (-1);
+ SendMsg->n = MsgCount++;
+ SendMsg->MyQEntry = MyQEntry;
+ SendMsg->MyQItem = MyQItem;
+ SendMsg->pCurrRelay = MyQItem->URL;
+
+ SendMsg->IO.Data = SendMsg;
+ SendMsg->IO.SendDone = SMTP_C_DispatchWriteDone;
+ SendMsg->IO.ReadDone = SMTP_C_DispatchReadDone;
+ SendMsg->IO.Terminate = SMTP_C_Terminate;
+ SendMsg->IO.LineReader = SMTP_C_ReadServerStatus;
+ SendMsg->IO.ConnFail = SMTP_C_ConnFail;
+ SendMsg->IO.Timeout = SMTP_C_Timeout;
+
+ if (KeepMsgText) {
+ SendMsg->msgtext = MsgText;
+ }
+ else {
SendMsg->msgtext = NewStrBufDup(MsgText);
+ }
if (smtp_resolve_recipients(SendMsg)) {
- QueueEventContext(SendMsg,
- &SendMsg->IO,
- resolve_mx_records);
+ if (SendMsg->pCurrRelay == NULL)
+ QueueEventContext(&SendMsg->IO,
+ resolve_mx_records);
+ else {
+ if (SendMsg->pCurrRelay->IsIP) {
+ QueueEventContext(&SendMsg->IO,
+ mx_connect_relay_ip);
+ }
+ else {
+ QueueEventContext(&SendMsg->IO,
+ get_one_mx_host_ip);
+ }
+ }
}
else {
if ((SendMsg==NULL) ||
if (SMTP_IS_STATE('2')) {
SendMsg->State ++;
- if (IsEmptyStr(SendMsg->mx_user))
+
+ if (SendMsg->pCurrRelay->User == NULL)
SendMsg->State ++; /* Skip auth... */
}
/* else we fall back to 'helo' */
else
SMTP_VERROR(5);
}
- if (!IsEmptyStr(SendMsg->mx_user))
+ if (SendMsg->pCurrRelay->User == NULL)
SendMsg->State ++; /* Skip auth... */
return eSendReply;
}
/* Do an AUTH command if necessary */
sprintf(buf, "%s%c%s%c%s",
- SendMsg->mx_user, '\0',
- SendMsg->mx_user, '\0',
- SendMsg->mx_pass);
+ SendMsg->pCurrRelay->User, '\0',
+ SendMsg->pCurrRelay->User, '\0',
+ SendMsg->pCurrRelay->Pass);
CtdlEncodeBase64(encoded, buf,
- strlen(SendMsg->mx_user) +
- strlen(SendMsg->mx_user) +
- strlen(SendMsg->mx_pass) + 2, 0);
+ strlen(SendMsg->pCurrRelay->User) * 2 +
+ strlen(SendMsg->pCurrRelay->Pass) + 2, 0);
StrBufPrintf(SendMsg->IO.SendBuf.Buf,
"AUTH PLAIN %s\r\n", encoded);
-
+
SMTP_DBG_SEND();
return eReadMessage;
}
return Finished;
}
-
#endif
CTDL_MODULE_INIT(smtp_eventclient)
{
MailQEntry *MyQEntry,
StrBuf *MsgText,
int KeepMsgText, /* KeepMsgText allows us to use MsgText as ours. */
- int MsgCount);
+ int MsgCount,
+ ParsedURL *RelayUrls);
void smtp_evq_cleanup(void)
CtdlLogPrintf(CTDL_DEBUG, "Done processing bounces\n");
}
+
+
+int ParseURL(ParsedURL **Url, StrBuf *UrlStr, short DefaultPort)
+{
+ const char *pch, *pStartHost, *pEndHost, *pPort, *pCredEnd, *pUserEnd;
+ ParsedURL *url = (ParsedURL *)malloc(sizeof(ParsedURL));
+ memset(url, 0, sizeof(ParsedURL));
+
+ url->af = AF_INET;
+ url->Port = DefaultPort;
+ /*
+ * http://username:passvoid@[ipv6]:port/url
+ */
+ url->URL = NewStrBufDup(UrlStr);
+ pStartHost = pch = ChrPtr(url->URL);
+ url->LocalPart = strchr(pch, '/');
+ if (url->LocalPart != NULL) {
+ if ((*(url->LocalPart + 1) == '/') &&
+ (*(url->LocalPart + 2) == ':')) { /* TODO: find default port for this protocol... */
+ pStartHost = url->LocalPart + 3;
+ url->LocalPart = strchr(pStartHost, '/');
+ }
+ }
+ if (url->LocalPart == NULL) {
+ url->LocalPart = pch + StrLength(UrlStr);
+ }
+
+ pCredEnd = strchr(pch, '@');
+ if (pCredEnd >= url->LocalPart)
+ pCredEnd = NULL;
+ if (pCredEnd != NULL)
+ {
+ url->User = pStartHost;
+ pStartHost = pCredEnd + 1;
+ pUserEnd = strchr(url->User, ':');
+
+ if (pUserEnd > pCredEnd)
+ pUserEnd = pCredEnd;
+ else {
+ url->Pass = pUserEnd + 1;
+ }
+ StrBufPeek(UrlStr, pUserEnd, 0, '\0');
+ StrBufPeek(UrlStr, pCredEnd, 0, '\0');
+ }
+
+ pPort = NULL;
+ if (*pStartHost == '[') {
+ pStartHost ++;
+ pEndHost = strchr(pStartHost, ']');
+ if (pEndHost == NULL) {
+ free(url);
+ return 0; /* invalid syntax, no ipv6 */
+ }
+ if (*(pEndHost + 1) == ':')
+ pPort = pEndHost + 2;
+ url->af = AF_INET6;
+ }
+ else {
+ pPort = strchr(pStartHost, ':');
+ if (pPort != NULL)
+ pPort ++;
+ }
+ if (pPort != NULL)
+ url->Port = atol(pPort);
+ url->IsIP = inet_pton(url->af, pStartHost, &url->Addr);
+ return 1;
+}
+/*
+{
+
+ if (threadding)
+ n_smarthosts = get_hosts(char *mxbuf, char *rectype);
+}
+*/
/*
* smtp_do_procmsg()
*
void *vQE;
long len;
const char *Key;
-
+ int nRelays = 0;
+ ParsedURL *RelayUrls = NULL;
+ int HaveBuffers = 0;
+ StrBuf *Msg =NULL;
+
CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
///strcpy(envelope_from, "");
return;
}
+ {
+ char mxbuf[SIZ];
+ ParsedURL **Url = &RelayUrls; ///&MyQItem->Relay;
+ nRelays = get_hosts(mxbuf, "smarthost");
+ if (nRelays > 0) {
+ StrBuf *All;
+ StrBuf *One;
+ const char *Pos = NULL;
+ All = NewStrBufPlain(mxbuf, -1);
+ One = NewStrBufPlain(NULL, StrLength(All) + 1);
+
+ while (Pos != StrBufNOTNULL) {
+ StrBufExtract_NextToken(One, All, &Pos, '|');
+ if (!ParseURL(Url, One, 25))
+ CtdlLogPrintf(CTDL_DEBUG, "Failed to parse: %s\n", ChrPtr(One));
+ else
+ Url = &(*Url)->Next;
+ }
+ }
+ }
+
It = GetNewHashPos(MyQItem->MailQEntries, 0);
while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
{
{
int n = MsgCount++;
int i = 1;
- StrBuf *Msg = smtp_load_msg(MyQItem, n);
+ Msg = smtp_load_msg(MyQItem, n);
It = GetNewHashPos(MyQItem->MailQEntries, 0);
while ((i <= MyQItem->ActiveDeliveries) &&
(GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)))
{
MailQEntry *ThisItem = vQE;
if (ThisItem->Active == 1) {
+ int KeepBuffers = (i == MyQItem->ActiveDeliveries);
if (i > 1) n = MsgCount++;
CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient));
- smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries), n);
+ smtp_try(MyQItem, ThisItem, Msg, KeepBuffers, n, RelayUrls);
+ if (KeepBuffers) HaveBuffers = 1;
i++;
}
}
// TODO: bounce & delete?
}
+ if (!HaveBuffers) {
+ FreeStrBuf (&Msg);
+// TODO : free RelayUrls
+ }
}
/* SMTP CLIENT (Queue Management) STUFF */
/*****************************************************************************/
+typedef struct ParsedURL ParsedURL;
+struct ParsedURL {
+ StrBuf *URL;
+ unsigned Port;
+ const char *User;
+ const char *Pass;
+ const char *LocalPart;
+ int IsIP;
+ int af;
+ struct hostent *HEnt;
+ struct in6_addr Addr;
+ ParsedURL *Next;
+};
+
+
#define MaxAttempts 15
typedef struct _delivery_attempt {
time_t when;
long ActiveDeliveries;
StrBuf *EnvelopeFrom;
StrBuf *BounceTo;
+ ParsedURL *URL;
} OneQueItem;
typedef void (*QItemHandler)(OneQueItem *Item, StrBuf *Line, const char **Pos);