From: Wilfried Goesgens Date: Tue, 4 Jan 2011 23:28:13 +0000 (+0100) Subject: libev/libc-ares migration X-Git-Tag: v8.11~1127 X-Git-Url: https://code.citadel.org/?p=citadel.git;a=commitdiff_plain;h=d268f98de5cfcd79b55869b79cc08db6e028c108 libev/libc-ares migration - resolving of the ip address is working now too; handling whether its an ipv6 host still missing. - libev doesn't succeed on doing async connect() --- diff --git a/citadel/event_client.c b/citadel/event_client.c index 59be13321..47856c8a6 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -375,11 +375,13 @@ int event_connect_socket(AsyncIO *IO) int rc = -1; IO->SendBuf.fd = - IO->RecvBuf.fd = - IO->sock = socket(IO->curr_ai->ai_family, + IO->RecvBuf.fd = + IO->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); +/* +IO->curr_ai->ai_family, IO->curr_ai->ai_socktype, IO->curr_ai->ai_protocol); - +*/ if (IO->sock < 0) { CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); @@ -411,7 +413,20 @@ int event_connect_socket(AsyncIO *IO) ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); IO->send_event.data = IO; - rc = connect(IO->sock, IO->curr_ai->ai_addr, IO->curr_ai->ai_addrlen); + unsigned short dport = atoi("25"); ///todo + struct sockaddr_in saddr; + memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) ); + + memcpy(&saddr.sin_addr, + IO->HEnt->h_addr_list[0], + sizeof(struct in_addr)); + saddr.sin_family = AF_INET; + saddr.sin_port = htons(dport);/// TODO + rc = connect(IO->sock, + (struct sockaddr *) &saddr, +/// TODO: ipv6?? (IO->HEnt->h_addrtype == AF_INET6)? + /* sizeof(in6_addr):*/ + sizeof(struct sockaddr_in)); if (rc >= 0) { //// freeaddrinfo(res); ev_io_init(&IO->conn_event, IO_send_callback, IO->sock, EV_WRITE); @@ -440,10 +455,10 @@ int event_connect_socket(AsyncIO *IO) CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); close(IO->sock); - IO->curr_ai = IO->curr_ai->ai_next; +/* IO->curr_ai = IO->curr_ai->ai_next; if (IO->curr_ai != NULL) return event_connect_socket(IO); - else + else*/ return -1; } } @@ -455,7 +470,6 @@ void InitEventIO(AsyncIO *IO, IO_CallBack Terminate, IO_CallBack Timeout, IO_CallBack ConnFail, - IO_CallBack CustomDNS, IO_LineReaderCallback LineReader, int ReadFirst) { @@ -471,7 +485,8 @@ void InitEventIO(AsyncIO *IO, else { IO->NextState = eSendReply; } - + IO->IP6 = IO->HEnt->h_addrtype == AF_INET6; +// IO->res = HEnt->h_addr_list[0]; event_connect_socket(IO); } diff --git a/citadel/event_client.h b/citadel/event_client.h index 39a9c11be..3aec485a1 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -30,6 +30,8 @@ struct AsyncIO { struct addrinfo *curr_ai; /* connection related */ + int IP6; + struct hostent *HEnt; int sock; int active_event; eNextState NextState; @@ -83,7 +85,6 @@ void InitEventIO(AsyncIO *IO, IO_CallBack Terminate, IO_CallBack Timeout, IO_CallBack ConnFail, - IO_CallBack CustomDNS, IO_LineReaderCallback LineReader, int ReadFirst); diff --git a/citadel/modules/c-ares-dns/serv_c-ares-dns.c b/citadel/modules/c-ares-dns/serv_c-ares-dns.c index 07e44a113..ae26751b0 100644 --- a/citadel/modules/c-ares-dns/serv_c-ares-dns.c +++ b/citadel/modules/c-ares-dns/serv_c-ares-dns.c @@ -64,242 +64,6 @@ ares_channel Channel; void SockStateCb(void *data, int sock, int read, int write); -/* - c-ares beim connect zum nameserver: if (channel->sock_create_cb) - - SOCK_STATE_CALLBACK(channel, s, 1, 0); -> void Channel::SockStateCb(void *data, int sock, int read, int write) { - - - -lesen der antwort: -#0 node::Channel::QueryCb (arg=0x8579268, status=0x0, timeouts=0x0, abuf=0xbffff0ef "\356|\201\200", alen=0x48) at ../src/node_cares.cc:453 -#1 0x08181884 in qcallback (arg=0x8579278, status=0x857b5a0, timeouts=0x0, abuf=0xbffff0ef "\356|\201\200", alen=0x48) at ../deps/c-ares/ares_query.c:180 -#2 0x0817fbf5 in end_query (channel=, query=0x85790b0, status=0x1, abuf=0xbffff0ef "\356|\201\200", alen=0x48) at ../deps/c-ares/ares_process.c:1233 -#3 0x08180898 in process_answer (channel=, abuf=, alen=, whichserver=0x0, tcp=0x0, now=0xbffff388) at ../deps/c-ares/ares_process.c:612 -#4 0x08180cf8 in read_udp_packets (channel=, read_fds=, read_fd=, now=0xbffff388) at ../deps/c-ares/ares_process.c:498 -#5 0x08181021 in processfds (channel=0x85a9888, read_fds=, read_fd=, write_fds=0x0, write_fd=0xffffffff) at ../deps/c-ares/ares_process.c:153 - --> -static void ParseAnswerMX(QueryArg *arg, unsigned char* abuf, int alen) { - HandleScope scope; - - - - */ - -/* - -static Local HostEntToAddresses(struct hostent* hostent) { - Local addresses = Array::New(); - - - char ip[INET6_ADDRSTRLEN]; - for (int i = 0; hostent->h_addr_list[i]; ++i) { - inet_ntop(hostent->h_addrtype, hostent->h_addr_list[i], ip, sizeof(ip)); - - Local address = String::New(ip); - addresses->Set(Integer::New(i), address); - } - - return addresses; -} - - -static Local HostEntToNames(struct hostent* hostent) { - Local names = Array::New(); - - for (int i = 0; hostent->h_aliases[i]; ++i) { - Local address = String::New(hostent->h_aliases[i]); - names->Set(Integer::New(i), address); - } - - return names; -} - -static inline const char *ares_errno_string(int errorno) { -#define ERRNO_CASE(e) case ARES_##e: return #e; - switch (errorno) { - ERRNO_CASE(SUCCESS) - ERRNO_CASE(ENODATA) - ERRNO_CASE(EFORMERR) - ERRNO_CASE(ESERVFAIL) - ERRNO_CASE(ENOTFOUND) - ERRNO_CASE(ENOTIMP) - ERRNO_CASE(EREFUSED) - ERRNO_CASE(EBADQUERY) - ERRNO_CASE(EBADNAME) - ERRNO_CASE(EBADFAMILY) - ERRNO_CASE(EBADRESP) - ERRNO_CASE(ECONNREFUSED) - ERRNO_CASE(ETIMEOUT) - ERRNO_CASE(EOF) - ERRNO_CASE(EFILE) - ERRNO_CASE(ENOMEM) - ERRNO_CASE(EDESTRUCTION) - ERRNO_CASE(EBADSTR) - ERRNO_CASE(EBADFLAGS) - ERRNO_CASE(ENONAME) - ERRNO_CASE(EBADHINTS) - ERRNO_CASE(ENOTINITIALIZED) - ERRNO_CASE(ELOADIPHLPAPI) - ERRNO_CASE(EADDRGETNETWORKPARAMS) - ERRNO_CASE(ECANCELLED) - default: - assert(0 && "Unhandled c-ares errno"); - return "(UNKNOWN)"; - } -} - - -static void ResolveError(Persistent &cb, int status) { - HandleScope scope; - - Local code = String::NewSymbol(ares_errno_string(status)); - Local message = String::NewSymbol(ares_strerror(status)); - - Local cons1 = String::Concat(code, String::NewSymbol(", ")); - Local cons2 = String::Concat(cons1, message); - - Local e = Exception::Error(cons2); - - Local obj = e->ToObject(); - obj->Set(String::NewSymbol("errno"), Integer::New(status)); - - TryCatch try_catch; - - cb->Call(v8::Context::GetCurrent()->Global(), 1, &e); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } -} - -static void HostByNameCb(void *data, - int status, - int timeouts, - struct hostent *hostent) { - HandleScope scope; - - Persistent *cb = cb_unwrap(data); - - if (status != ARES_SUCCESS) { - ResolveError(*cb, status); - cb_destroy(cb); - return; - } - - TryCatch try_catch; - - Local addresses = HostEntToAddresses(hostent); - - Local argv[2] = { Local::New(Null()), addresses}; - - (*cb)->Call(v8::Context::GetCurrent()->Global(), 2, argv); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - - cb_destroy(cb); -} - - - - -static void cb_call(Persistent &cb, int argc, Local *argv) { - TryCatch try_catch; - - cb->Call(v8::Context::GetCurrent()->Global(), argc, argv); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } -} - -Handle Channel::GetHostByAddr(const Arguments& args) { - HandleScope scope; - Channel *c = ObjectWrap::Unwrap(args.Holder()); - assert(c); - - if (!args[0]->IsString()) { - return ThrowException(Exception::Error( - String::New("First argument must be a address"))); - } - - if (!args[1]->IsInt32()) { - return ThrowException(Exception::Error( - String::New("Second argument must be an address family"))); - } - - if (!args[2]->IsFunction()) { - return ThrowException(Exception::Error( - String::New("Third argument must be a callback"))); - } - - int family = args[1]->Int32Value(); - if (family != AF_INET6 && family != AF_INET) { - return ThrowException(Exception::Error( - String::New("Unsupported address family"))); - } - - String::Utf8Value address_s(args[0]->ToString()); - - char address_b[sizeof(struct in6_addr)]; - int r = inet_pton(family, *address_s, address_b); - if (r != 1) { - return ThrowException(Exception::Error( - String::New("Invalid network address"))); - } - - int length; - if (family == AF_INET6) - length = sizeof(struct in6_addr); - else - length = sizeof(struct in_addr); - - ares_gethostbyaddr(c->channel, address_b, length, family, HostByAddrCb, cb_persist(args[2])); - - return Undefined(); -} - - - -Handle Channel::GetHostByName(const Arguments& args) { - HandleScope scope; - Channel *c = ObjectWrap::Unwrap(args.Holder()); - assert(c); - - if (!args[0]->IsString()) { - return ThrowException(Exception::Error( - String::New("First argument must be a name"))); - } - - if (!args[1]->IsInt32()) { - return ThrowException(Exception::Error( - String::New("Second argument must be a family"))); - } - - if (!args[2]->IsFunction()) { - return ThrowException(Exception::Error( - String::New("Third argument must be a callback"))); - } - - int family = args[1]->Int32Value(); - if (family != AF_INET6 && family != AF_INET) { - return ThrowException(Exception::Error( - String::New("Unsupported address family"))); - } - - String::Utf8Value name(args[0]->ToString()); - - ares_gethostbyname(c->channel, *name, family, HostByNameCb, cb_persist(args[2])); - - return Undefined(); -} - -*/ - - static void HostByAddrCb(void *data, int status, @@ -465,12 +229,14 @@ int QueueQuery(ns_type Type, char *name, AsyncIO *IO, IO_CallBack PostDNS) int length, family; char address_b[sizeof(struct in6_addr)]; int optmask = 0; - int rfd, wfd; + fd_set rfd, wfd; - optmask |= ARES_OPT_SOCK_STATE_CB; - IO->DNSOptions.sock_state_cb = SockStateCb; - IO->DNSOptions.sock_state_cb_data = IO; - ares_init_options(&IO->DNSChannel, &IO->DNSOptions, optmask); + if (IO->DNSChannel == NULL) { + optmask |= ARES_OPT_SOCK_STATE_CB; + IO->DNSOptions.sock_state_cb = SockStateCb; + IO->DNSOptions.sock_state_cb_data = IO; + ares_init_options(&IO->DNSChannel, &IO->DNSOptions, optmask); + } IO->PostDNS = PostDNS; switch(Type) { diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index 6c04fa5f2..e07bd9d7d 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -191,25 +191,21 @@ typedef struct _stmp_out_msg { eSMTP_C_States State; -/// int SMTPstatus; ->MyQEntry->Status + struct ares_mx_reply *AllMX; + struct ares_mx_reply *CurrMX; + const char *mx_port; + const char *mx_host; + + struct hostent *OneMX; + - int i_mx; - int n_mx; - int num_mxhosts; char mx_user[1024]; char mx_pass[1024]; - char mx_host[1024]; - char mx_port[1024]; - char mxhosts[SIZ]; - StrBuf *msgtext; char *envelope_from; char user[1024]; char node[1024]; char name[1024]; -/// char addr[SIZ]; -> MyQEntry->Recipient -/// char dsn[1024]; -> MyQEntry->StatusMessage -/// char envelope_from_buf[1024]; MyQItem->EnvelopeFrom char mailfrom[1024]; } SmtpOutMsg; void DeleteSmtpOutMsg(void *v) @@ -565,21 +561,6 @@ int smtp_resolve_recipients(SmtpOutMsg *SendMsg) return 0; } -void resolve_mx_hosts(SmtpOutMsg *SendMsg) -{ - /// well this is blocking and sux, but libevent jsut supports async dns since v2 - /* Figure out what mail exchanger host we have to connect to */ - SendMsg->num_mxhosts = getmx(SendMsg->mxhosts, SendMsg->node); - CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: Number of MX hosts for <%s> is %d [%s]\n", - SendMsg->n, SendMsg->node, SendMsg->num_mxhosts, SendMsg->mxhosts); - if (SendMsg->num_mxhosts < 1) { - SendMsg->MyQEntry->Status = 5; - StrBufPrintf(SendMsg->MyQEntry->StatusMessage, - "No MX hosts found for <%s>", SendMsg->node); - return; ///////TODO: abort! - } - -} #define SMTP_ERROR(WHICH_ERR, ERRSTR) {SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); return eAbort; } #define SMTP_VERROR(WHICH_ERR) { SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); return eAbort; } @@ -588,42 +569,11 @@ void resolve_mx_hosts(SmtpOutMsg *SendMsg) #define SMTP_DBG_SEND() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: > %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) #define SMTP_DBG_READ() CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: < %s\n", SendMsg->n, ChrPtr(SendMsg->IO.IOBuf)) -void connect_one_smtpsrv(SmtpOutMsg *SendMsg) -{ - char *endpart; - char buf[SIZ]; - extract_token(buf, SendMsg->mxhosts, SendMsg->n_mx, '|', sizeof(buf)); - strcpy(SendMsg->mx_user, ""); - strcpy(SendMsg->mx_pass, ""); - if (num_tokens(buf, '@') > 1) { - strcpy (SendMsg->mx_user, buf); - endpart = strrchr(SendMsg->mx_user, '@'); - *endpart = '\0'; - strcpy (SendMsg->mx_host, endpart + 1); - endpart = strrchr(SendMsg->mx_user, ':'); - if (endpart != NULL) { - strcpy(SendMsg->mx_pass, endpart+1); - *endpart = '\0'; - } - } - else - strcpy (SendMsg->mx_host, buf); - endpart = strrchr(SendMsg->mx_host, ':'); - if (endpart != 0){ - *endpart = '\0'; - strcpy(SendMsg->mx_port, endpart + 1); - } - else { - strcpy(SendMsg->mx_port, "25"); - } - CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connecting to %s : %s ...\n", - SendMsg->n, SendMsg->mx_host, SendMsg->mx_port); - -} - - -int connect_one_smtpsrv_xamine_result(void *Ctx) +void connect_one_smtpsrv_xamine_result(void *Ctx, + int status, + int timeouts, + struct hostent *hostent) { SmtpOutMsg *SendMsg = Ctx; @@ -661,7 +611,7 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) //// hier: abbrechen & bounce. return -1; } - +/* InitEventIO(&SendMsg->IO, SendMsg, SMTP_C_DispatchReadDone, @@ -672,9 +622,86 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) SMTP_C_MXLookup, SMTP_C_ReadServerStatus, 1); +*/ return 0; } +void get_one_mx_host_name_done(void *Ctx, + int status, + int timeouts, + struct hostent *hostent) +{ + SmtpOutMsg *SendMsg = Ctx; + if ((status == ARES_SUCCESS) && (hostent != NULL) ) { + + SendMsg->IO.HEnt = hostent; + InitEventIO(&SendMsg->IO, SendMsg, + SMTP_C_DispatchReadDone, + SMTP_C_DispatchWriteDone, + SMTP_C_Terminate, + SMTP_C_Timeout, + SMTP_C_ConnFail, + SMTP_C_ReadServerStatus, + 1); + + } +} + +const char *DefaultMXPort = "25"; +void connect_one_smtpsrv(SmtpOutMsg *SendMsg) +{ + //char *endpart; + //char buf[SIZ]; + + SendMsg->mx_port = DefaultMXPort; + +/* TODO: Relay! + *SendMsg->mx_user = '\0'; + *SendMsg->mx_pass = '\0'; + if (num_tokens(buf, '@') > 1) { + strcpy (SendMsg->mx_user, buf); + endpart = strrchr(SendMsg->mx_user, '@'); + *endpart = '\0'; + strcpy (SendMsg->mx_host, endpart + 1); + endpart = strrchr(SendMsg->mx_user, ':'); + if (endpart != NULL) { + strcpy(SendMsg->mx_pass, endpart+1); + *endpart = '\0'; + } + + endpart = strrchr(SendMsg->mx_host, ':'); + if (endpart != 0){ + *endpart = '\0'; + strcpy(SendMsg->mx_port, endpart + 1); + } + } + else +*/ + SendMsg->mx_host = SendMsg->CurrMX->host; + SendMsg->CurrMX = SendMsg->CurrMX->next; + + CtdlLogPrintf(CTDL_DEBUG, + "SMTP client[%ld]: connecting to %s : %s ...\n", + SendMsg->n, + SendMsg->mx_host, + SendMsg->mx_port); + + ares_gethostbyname(SendMsg->IO.DNSChannel, + SendMsg->mx_host, + AF_INET6, /* it falls back to ipv4 in doubt... */ + get_one_mx_host_name_done, + &SendMsg->IO); +/* + if (!QueueQuery(ns_t_a, + SendMsg->mx_host, + &SendMsg->IO, + connect_one_smtpsrv_xamine_result)) + { + /// TODO: abort + } +*/ +} + eNextState SMTPC_read_greeting(SmtpOutMsg *SendMsg) { @@ -924,25 +951,31 @@ eNextState SMTPC_send_dummy(SmtpOutMsg *SendMsg) return eReadMessage; } -eNextState smtp_resolve_one_smtpsrv_start(void *data) -{ +eNextState smtp_resolve_mx_done(void *data) +{/// VParsedDNSReply AsyncIO *IO = data; SmtpOutMsg * SendMsg = IO->Data; -/// resolve_mx_hosts(SendMsg); - //// connect_one_smtpsrv_xamine_result + //// connect_one_smtpsrv_xamine_result + SendMsg->CurrMX = SendMsg->AllMX = IO->VParsedDNSReply; + //// TODO: should we remove the current ares context??? connect_one_smtpsrv(SendMsg); } + + int resolve_mx_records(void *Ctx) { SmtpOutMsg * SendMsg = Ctx; if (!QueueQuery(ns_t_mx, SendMsg->node, &SendMsg->IO, - smtp_resolve_one_smtpsrv_start)) + smtp_resolve_mx_done)) { - /// TODO: abort + SendMsg->MyQEntry->Status = 5; + StrBufPrintf(SendMsg->MyQEntry->StatusMessage, + "No MX hosts found for <%s>", SendMsg->node); + return; ///////TODO: abort! } } @@ -959,6 +992,7 @@ void smtp_try(OneQueItem *MyQItem, SendMsg->n = MsgCount++; SendMsg->MyQEntry = MyQEntry; SendMsg->MyQItem = MyQItem; + SendMsg->IO.Data = SendMsg; if (KeepMsgText) SendMsg->msgtext = MsgText; else