X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=91dcabf758e4a38244f84618e9bd32272a325697;hb=d406f030612c8793672a4172ae5d6d64a47dd5df;hp=c7e314db43ba675eaf9bf2abe21e60320afa9945;hpb=3b1a3e6685b0c1b21373eea9ba3e11b95ca4332c;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index c7e314db4..91dcabf75 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -69,7 +69,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) { - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); AsyncIO *IO = watcher->data; assert(IO->ShutdownAbort); @@ -81,13 +81,13 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher * Server DB IO */ extern int evdb_count; -extern citthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; extern ev_async DBAddJob; extern ev_async DBExitEventLoop; -int QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) { IOAddHandler *h; int i; @@ -100,15 +100,15 @@ int QueueDBOperation(AsyncIO *IO, IO_CallBack CB) IO->db_abort_by_shutdown.data = IO; ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); - citthread_mutex_lock(&DBEventQueueMutex); - CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q\n"); + pthread_mutex_lock(&DBEventQueueMutex); + syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); - citthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&DBEventQueueMutex); ev_async_send (event_db, &DBAddJob); - CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q Done.\n"); - return 0; + syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + return eDBQuery; } void ShutDownDBCLient(AsyncIO *IO) @@ -116,7 +116,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - CtdlLogPrintf(CTDL_DEBUG, "DBEVENT\n"); + syslog(LOG_DEBUG, "DBEVENT\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); assert(IO->Terminate); @@ -130,7 +130,7 @@ void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__); + syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eConnect: case eSendReply: case eSendMore: + case eSendFile: case eReadMessage: case eReadMore: case eReadPayload: + case eReadFile: ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); break; case eTerminateConnection: @@ -170,14 +172,14 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) * Client IO */ extern int evbase_count; -extern citthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; -int QueueEventContext(AsyncIO *IO, IO_CallBack CB) +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { IOAddHandler *h; int i; @@ -190,26 +192,26 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB) IO->abort_by_shutdown.data = IO; ev_cleanup_start(event_base, &IO->abort_by_shutdown); - citthread_mutex_lock(&EventQueueMutex); - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); + pthread_mutex_lock(&EventQueueMutex); + syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); ev_async_send (event_base, &AddJob); - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n"); - return 0; + syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; } int ShutDownEventQueue(void) { - citthread_mutex_lock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventQueueMutex); ev_async_send (event_db, &DBExitEventLoop); - citthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&DBEventQueueMutex); - citthread_mutex_lock(&EventQueueMutex); + pthread_mutex_lock(&EventQueueMutex); ev_async_send (EV_DEFAULT_ &ExitEventLoop); - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); return 0; } @@ -240,7 +242,7 @@ void ShutDownCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->SendBuf.fd); + syslog(LOG_DEBUG, "EVENT x %d\n", IO->SendBuf.fd); ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); @@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO) eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; become_session(IO->CitContext); @@ -267,6 +270,7 @@ eReadState HandleInbound(AsyncIO *IO) while ((Finished == eBufferNotEmpty) && ((IO->NextState == eReadMessage)|| (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| (IO->NextState == eReadPayload))) { if (IO->RecvBuf.nBlobBytesWanted != 0) { @@ -274,14 +278,22 @@ eReadState HandleInbound(AsyncIO *IO) } else { /* Reading lines... */ //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) Finished = IO->LineReader(IO); else Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); switch (Finished) { case eMustReadMore: /// read new from socket... - return Finished; break; case eBufferNotEmpty: /* shouldn't happen... */ case eReadSuccess: /// done for now... @@ -302,6 +314,9 @@ eReadState HandleInbound(AsyncIO *IO) } switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; case eSendReply: case eSendMore: assert(IO->SendDone); @@ -310,9 +325,12 @@ eReadState HandleInbound(AsyncIO *IO) break; case eReadPayload: case eReadMore: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: +//////TODOxxxx + break; case eAbort: ShutDownCLient(IO); break; @@ -332,6 +350,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; + const char *errmsg = NULL; become_session(IO->CitContext); #ifdef BIGBAD_IODBG @@ -358,7 +377,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); #endif - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + } #ifdef BIGBAD_IODBG fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); @@ -380,6 +407,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) ev_io_start(event_base, &IO->send_event); } break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } + } + break; case eSendReply: if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; @@ -387,6 +442,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eReadMore: case eReadMessage: case eReadPayload: + case eReadFile: if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { HandleInbound(IO); } @@ -420,11 +476,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) switch(IO->NextState) { case eReadMore: case eReadMessage: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eSendReply: case eSendMore: case eReadPayload: + case eSendFile: become_session(IO->CitContext); IO_send_callback(loop, &IO->send_event, revents); break; @@ -532,10 +590,33 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { + const char *errmsg; ssize_t nbytes; AsyncIO *IO = watcher->data; - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + switch (IO->NextState) { + case eReadFile: + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + syslog(LOG_DEBUG, "****************nbytes: %ld ChunkRemain: %ldx.\n", + nbytes, IO->IOB.ChunkSendRemain); + if (nbytes < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + syslog(LOG_DEBUG, "***********************************xxxx.\n"); + } + else + return; + } + break; + default: + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + break; + } + #ifdef BIGBAD_IODBG { int rv = 0; @@ -579,9 +660,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); - CtdlLogPrintf(CTDL_DEBUG, - "EVENT: Socket Invalid! %s \n", - strerror(errno)); + syslog(LOG_DEBUG, + "EVENT: Socket Invalid! %s \n", + strerror(errno)); + ShutDownCLient(IO); return; } } @@ -590,13 +672,14 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__); + syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - + assert(IO->DNSFail); + assert(IO->DNSQuery->PostDNS); switch (IO->DNSQuery->PostDNS(IO)) { case eAbort: - ShutDownCLient(IO); + IO->DNSFail(IO); default: break; } @@ -614,23 +697,23 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IPPROTO_TCP); if (IO->SendBuf.fd < 0) { - CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); + syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - CtdlLogPrintf(CTDL_DEBUG, - "EVENT: unable to get socket flags! %s \n", - strerror(errno)); + syslog(LOG_DEBUG, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { - CtdlLogPrintf(CTDL_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); + syslog(LOG_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = -1; @@ -658,13 +741,13 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); if (rc >= 0){ - CtdlLogPrintf(CTDL_DEBUG, "connect() immediate success.\n"); + syslog(LOG_DEBUG, "connect() immediate success.\n"); set_start_callback(event_base, IO, 0); ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; } else if (errno == EINPROGRESS) { - CtdlLogPrintf(CTDL_DEBUG, "connect() have to wait now.\n"); + syslog(LOG_DEBUG, "connect() have to wait now.\n"); ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); IO->conn_event.data = IO; @@ -679,7 +762,7 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); + syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); return IO->NextState; }