X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=cdcee9f6e99c7916fb87fb973ff843b19d8347b3;hb=c855d497545dad80942a194624c111a54cd1fdc7;hp=b5e54b9593047411e3780e60411c49ed2ed14ed8;hpb=77e22b6bbcf9fa8c690ad3872f9a65d969d45335;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index b5e54b959..cdcee9f6e 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -227,18 +227,6 @@ eNextState QueueCurlContext(AsyncIO *IO) return eSendReply; } -int ShutDownEventQueue(void) -{ - pthread_mutex_lock(&DBEventQueueMutex); - ev_async_send (event_db, &DBExitEventLoop); - pthread_mutex_unlock(&DBEventQueueMutex); - - pthread_mutex_lock(&EventQueueMutex); - ev_async_send (EV_DEFAULT_ &ExitEventLoop); - pthread_mutex_unlock(&EventQueueMutex); - return 0; -} - void FreeAsyncIOContents(AsyncIO *IO) { FreeStrBuf(&IO->IOBuf); @@ -271,11 +259,11 @@ void ShutDownCLient(AsyncIO *IO) ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); - if (IO->DNSChannel != NULL) { - ares_destroy(IO->DNSChannel); - ev_io_stop(event_base, &IO->dns_recv_event); - ev_io_stop(event_base, &IO->dns_send_event); - IO->DNSChannel = NULL; + if (IO->DNS.Channel != NULL) { + ares_destroy(IO->DNS.Channel); + ev_io_stop(event_base, &IO->DNS.recv_event); + ev_io_stop(event_base, &IO->DNS.send_event); + IO->DNS.Channel = NULL; } assert(IO->Terminate); IO->Terminate(IO); @@ -695,58 +683,80 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) AsyncIO *IO = watcher->data; EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - assert(IO->DNSFail); - assert(IO->DNSQuery->PostDNS); - switch (IO->DNSQuery->PostDNS(IO)) + assert(IO->DNS.Fail); + assert(IO->DNS.Query->PostDNS); + switch (IO->DNS.Query->PostDNS(IO)) { case eAbort: - switch (IO->DNSFail(IO)) { + switch (IO->DNS.Fail(IO)) { case eAbort: ShutDownCLient(IO); default: break; - } default: break; } } -eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) + +eNextState EvConnectSock(AsyncIO *IO, + double conn_timeout, + double first_rw_timeout, + int ReadFirst) { - int fdflags; + int fdflags; int rc = -1; - IO->SendBuf.fd = IO->RecvBuf.fd = + become_session(IO->CitContext); + + if (ReadFirst) { + IO->NextState = eReadMessage; + } + else { + IO->NextState = eSendReply; + } + + IO->SendBuf.fd = IO->RecvBuf.fd = socket( - (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, - SOCK_STREAM, + (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, + SOCK_STREAM, IPPROTO_TCP); if (IO->SendBuf.fd < 0) { - EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); + EV_syslog(LOG_ERR, + "EVENT: socket() failed: %s\n", + strerror(errno)); + + StrBufPrintf(IO->ErrMsg, + "Failed to create socket: %s", + strerror(errno)); return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - EV_syslog(LOG_DEBUG, + EV_syslog(LOG_DEBUG, "EVENT: unable to get socket flags! %s \n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to get socket flags: %s", + strerror(errno)); return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { - EV_syslog(LOG_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + EV_syslog( + LOG_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to set socket flags: %s", + strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = -1; return eAbort; } -/* TODO: maye we could use offsetof() to calc the position of data... +/* TODO: maye we could use offsetof() to calc the position of data... * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher */ ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ); @@ -756,16 +766,20 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); IO->conn_fail.data = IO; - ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0); + ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0); IO->rw_timeout.data = IO; /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ /// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1"); if (IO->ConnectMe->IPv6) - rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); + rc = connect(IO->SendBuf.fd, + &IO->ConnectMe->Addr, + sizeof(struct sockaddr_in6)); else - rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + rc = connect(IO->SendBuf.fd, + (struct sockaddr_in *)&IO->ConnectMe->Addr, + sizeof(struct sockaddr_in)); if (rc >= 0){ EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); @@ -776,7 +790,11 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r else if (errno == EINPROGRESS) { EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); - ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); + ev_io_init(&IO->conn_event, + IO_connestd_callback, + IO->SendBuf.fd, + EV_READ|EV_WRITE); + IO->conn_event.data = IO; ev_io_start(event_base, &IO->conn_event); @@ -788,9 +806,11 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - + EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to connect: %s", + strerror(errno)); return IO->NextState; } return IO->NextState; @@ -802,26 +822,9 @@ void SetNextTimeout(AsyncIO *IO, double timeout) ev_timer_again (event_base, &IO->rw_timeout); } -eNextState InitEventIO(AsyncIO *IO, - void *pData, - double conn_timeout, - double first_rw_timeout, - int ReadFirst) -{ - IO->Data = pData; - become_session(IO->CitContext); - - if (ReadFirst) { - IO->NextState = eReadMessage; - } - else { - IO->NextState = eSendReply; - } - return event_connect_socket(IO, conn_timeout, first_rw_timeout); -} -eNextState ReAttachIO(AsyncIO *IO, - void *pData, +eNextState ReAttachIO(AsyncIO *IO, + void *pData, int ReadFirst) { IO->Data = pData; @@ -837,3 +840,63 @@ eNextState ReAttachIO(AsyncIO *IO, return IO->NextState; } + +void InitIOStruct(AsyncIO *IO, + void *Data, + eNextState NextState, + IO_LineReaderCallback LineReader, + IO_CallBack DNS_Fail, + IO_CallBack SendDone, + IO_CallBack ReadDone, + IO_CallBack Terminate, + IO_CallBack ConnFail, + IO_CallBack Timeout, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->NextState = NextState; + + IO->SendDone = SendDone; + IO->ReadDone = ReadDone; + IO->Terminate = Terminate; + IO->LineReader = LineReader; + IO->ConnFail = ConnFail; + IO->Timeout = Timeout; + IO->ShutdownAbort = ShutdownAbort; + + IO->DNS.Fail = DNS_Fail; + + IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->IOBuf = NewStrBuf(); + +} + +extern int evcurl_init(AsyncIO *IO); + +int InitcURLIOStruct(AsyncIO *IO, + void *Data, + const char* Desc, + IO_CallBack SendDone, + IO_CallBack Terminate, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->ShutdownAbort = ShutdownAbort; + + strcpy(IO->HttpReq.errdesc, Desc); + + + return evcurl_init(IO); + +}