X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=8484cad4cfb9437e52682fcebce43655755e4bfe;hb=1bd9b2cfbdf91a8c13c3cbd11e37d99830867a64;hp=ba18107d76c1307c46ab7754c4c39796eb28dc02;hpb=6ae6c71b048ef1af433a5f9d8fd0cac8be680959;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index ba18107d7..8484cad4c 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -70,25 +70,33 @@ extern int event_add_pipe[2]; extern citthread_mutex_t EventQueueMutex; -extern void *QueueEventAddPtr; -extern AsyncIO *QueueThisIO; -extern EventContextAttach EventContextAttachPtr; +extern HashList *InboundEventQueue; +#define SEND_EVENT 1 +#define RECV_EVENT 2 + int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) { + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->Ctx = Ctx; + h->EvAttch = CB; + citthread_mutex_lock(&EventQueueMutex); if (event_add_pipe[1] == -1) { citthread_mutex_unlock(&EventQueueMutex); - + free (h); return -1; } - - QueueEventAddPtr = Ctx; - EventContextAttachPtr = CB; - QueueThisIO = IO; + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); + i = GetCount(InboundEventQueue); + Put(InboundEventQueue, IKEY(i), h, NULL); + citthread_mutex_unlock(&EventQueueMutex); write(event_add_pipe[1], "+_", 1); - citthread_mutex_unlock(&EventQueueMutex); + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n"); return 0; } @@ -117,29 +125,40 @@ void FreeAsyncIOContents(AsyncIO *IO) } /* -static void -setup_signal_handlers(struct instance *instance) -{ - signal(SIGPIPE, SIG_IGN); - - event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigterm_event, NULL); - - event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigint_event, NULL); - - event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigquit_event, NULL); -} + static void + setup_signal_handlers(struct instance *instance) + { + signal(SIGPIPE, SIG_IGN); + + event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigterm_event, NULL); + + event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigint_event, NULL); + + event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigquit_event, NULL); + } */ void ShutDownCLient(AsyncIO *IO) { - event_del(&IO->send_event); - event_del(&IO->recv_event); + CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock); + switch (IO->active_event) { + case SEND_EVENT: + event_del(&IO->send_event); + break; + case RECV_EVENT: + event_del(&IO->recv_event); + break; + case 0: + // no event active here; just bail out. + break; + } + IO->active_event = 0; IO->Terminate(IO->Data); // citthread_mutex_lock(&EventQueueMutex); @@ -180,6 +199,7 @@ eReadState HandleInbound(AsyncIO *IO) if (Finished != eMustReadMore) { event_del(&IO->recv_event); + IO->active_event = 0; IO->NextState = IO->ReadDone(IO->Data); Finished = StrBufCheckBuffer(&IO->RecvBuf); } @@ -191,6 +211,7 @@ eReadState HandleInbound(AsyncIO *IO) { IO->NextState = IO->SendDone(IO->Data); event_add(&IO->send_event, NULL); + IO->active_event = SEND_EVENT; } else if ((IO->NextState == eTerminateConnection) || @@ -208,41 +229,77 @@ IO_send_callback(int fd, short event, void *ctx) (void)fd; (void)event; - + CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", + (int) fd, + (event&EV_TIMEOUT) ? " timeout" : "", + (event&EV_READ) ? " read" : "", + (event&EV_WRITE) ? " write" : "", + (event&EV_SIGNAL) ? " signal" : ""); + /// assert(fd == IO->sock); rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf); if (rc == 0) { - event_del(&IO->send_event); - switch (IO->NextState) { - case eSendReply: - break; - case eSendMore: - IO->NextState = IO->SendDone(IO->Data); - - if ((IO->NextState == eTerminateConnection) || - (IO->NextState == eAbort) ) - ShutDownCLient(IO); - else - event_add(&IO->send_event, NULL); - break; - case eReadMessage: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { - HandleInbound(IO); - } - else { - event_add(&IO->recv_event, NULL); - } - - break; - case eAbort: - break; - } + +#ifdef BIGBAD_IODBG + { + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->SendBuf.Buf); + const char *pchh = IO->SendBuf.ReadWritePointer; + long nbytes; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock); + + fd = fopen(fn, "a+"); + fprintf(fd, "Read: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + fprintf(fd, "]\n"); + + + fclose(fd); + } +#endif + event_del(&IO->send_event); + IO->active_event = 0; + switch (IO->NextState) { + case eSendReply: + break; + case eSendMore: + IO->NextState = IO->SendDone(IO->Data); + + if ((IO->NextState == eTerminateConnection) || + (IO->NextState == eAbort) ) + ShutDownCLient(IO); + else { + event_add(&IO->send_event, NULL); + IO->active_event = SEND_EVENT; + } + break; + case eReadMessage: + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + HandleInbound(IO); + } + else { + event_add(&IO->recv_event, NULL); + IO->active_event = RECV_EVENT; + } + + break; + case eAbort: + break; + } } else if (rc > 0) - return; + ShutDownCLient(IO); // else ///abort! } @@ -258,11 +315,18 @@ IO_recv_callback(int fd, short event, void *ctx) // assert(fd == sb->fd); + CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n", + (int) fd, + (event&EV_TIMEOUT) ? " timeout" : "", + (event&EV_READ) ? " read" : "", + (event&EV_WRITE) ? " write" : "", + (event&EV_SIGNAL) ? " signal" : ""); nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf); if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - /// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); + ShutDownCLient(IO); +/// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); seems as if socket is gone then? return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); @@ -270,15 +334,6 @@ IO_recv_callback(int fd, short event, void *ctx) } } -void IOReadNextLine(AsyncIO *IO, int timeout) -{ - -} - -void IOReadNextBLOB(AsyncIO *IO, int timeout, long size) -{ -} - void InitEventIO(AsyncIO *IO, void *pData, IO_CallBack ReadDone, @@ -308,6 +363,7 @@ void InitEventIO(AsyncIO *IO, if (ReadFirst) { IO->NextState = eReadMessage; event_add(&IO->recv_event, NULL); + IO->active_event = RECV_EVENT; } else { IO->NextState = eSendReply;