X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=94777828a5974c6f9e7229a97c765b7a8531beb3;hb=ae6cddd2ff16f7056db3c773a96f20bc77f5d2c3;hp=9a7566f7d4d84f777eac6c1a2e8085b631467695;hpb=c06372f95e9313f29a331c7d76ef812e480f3c84;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 9a7566f7d..94777828a 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -69,9 +69,9 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) { - syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + assert(IO->ShutdownAbort); IO->ShutdownAbort(IO); } @@ -101,13 +101,13 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); pthread_mutex_lock(&DBEventQueueMutex); - syslog(LOG_DEBUG, "DBEVENT Q\n"); + EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&DBEventQueueMutex); ev_async_send (event_db, &DBAddJob); - syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } @@ -116,7 +116,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - syslog(LOG_DEBUG, "DBEVENT\n"); + EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); assert(IO->Terminate); @@ -130,7 +130,7 @@ void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -154,7 +154,9 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) break; case eTerminateConnection: case eAbort: - ShutDownDBCLient(IO); + ev_idle_stop(event_db, &IO->db_unwind_stack); + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + ShutDownDBCLient(IO); } } @@ -193,13 +195,35 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) ev_cleanup_start(event_base, &IO->abort_by_shutdown); pthread_mutex_lock(&EventQueueMutex); - syslog(LOG_DEBUG, "EVENT Q\n"); + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; + Put(InboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&EventQueueMutex); + + ev_async_send (event_base, &AddJob); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; +} + +extern eNextState evcurl_handle_start(AsyncIO *IO); + +eNextState QueueCurlContext(AsyncIO *IO) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = evcurl_handle_start; + + pthread_mutex_lock(&EventQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); ev_async_send (event_base, &AddJob); - syslog(LOG_DEBUG, "EVENT Q Done.\n"); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -242,7 +266,7 @@ void ShutDownCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - syslog(LOG_DEBUG, "EVENT x %d\n", IO->SendBuf.fd); + EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); @@ -270,6 +294,7 @@ eReadState HandleInbound(AsyncIO *IO) while ((Finished == eBufferNotEmpty) && ((IO->NextState == eReadMessage)|| (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| (IO->NextState == eReadPayload))) { if (IO->RecvBuf.nBlobBytesWanted != 0) { @@ -277,7 +302,16 @@ eReadState HandleInbound(AsyncIO *IO) } else { /* Reading lines... */ //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) Finished = IO->LineReader(IO); else Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); @@ -300,15 +334,6 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); - if ((IO->NextState == eReadFile) && - (Finished == eBufferNotEmpty)) - { - Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); - if (Finished == eReadSuccess) - { - IO->NextState = eSendReply; - } - } } } @@ -590,20 +615,29 @@ static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { const char *errmsg; - long rc; ssize_t nbytes; AsyncIO *IO = watcher->data; switch (IO->NextState) { case eReadFile: - rc = FileRecvChunked(&IO->IOB, &errmsg); - if (rc < 0) + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + if (nbytes < 0) StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + } + else + return; + } break; default: nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); break; } + #ifdef BIGBAD_IODBG { int rv = 0; @@ -647,9 +681,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); - syslog(LOG_DEBUG, - "EVENT: Socket Invalid! %s \n", - strerror(errno)); + EV_syslog(LOG_DEBUG, + "EVENT: Socket Invalid! %s \n", + strerror(errno)); + ShutDownCLient(IO); return; } } @@ -658,9 +693,10 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNSFail); + assert(IO->DNSQuery->PostDNS); switch (IO->DNSQuery->PostDNS(IO)) { case eAbort: @@ -682,23 +718,23 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IPPROTO_TCP); if (IO->SendBuf.fd < 0) { - syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); + EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - syslog(LOG_DEBUG, - "EVENT: unable to get socket flags! %s \n", - strerror(errno)); + EV_syslog(LOG_DEBUG, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { - syslog(LOG_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); + EV_syslog(LOG_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = -1; @@ -726,13 +762,13 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); if (rc >= 0){ - syslog(LOG_DEBUG, "connect() immediate success.\n"); + EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); set_start_callback(event_base, IO, 0); ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; } else if (errno == EINPROGRESS) { - syslog(LOG_DEBUG, "connect() have to wait now.\n"); + EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); IO->conn_event.data = IO; @@ -747,7 +783,7 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); return IO->NextState; }