From 1bd9b2cfbdf91a8c13c3cbd11e37d99830867a64 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 27 Dec 2010 16:45:53 +0100 Subject: [PATCH] Libevent Migration - use a bigger read buffer for the smtp client - differentiate between queue and smtp client in log messages - migrate scheduling of jobs to a locked list --- citadel/event_client.c | 186 ++++++++++++------ citadel/event_client.h | 6 + ...serv_evventclient.c => serv_eventclient.c} | 80 ++++++-- citadel/modules/smtp/serv_smtpeventclient.c | 32 ++- 4 files changed, 210 insertions(+), 94 deletions(-) rename citadel/modules/eventclient/{serv_evventclient.c => serv_eventclient.c} (73%) diff --git a/citadel/event_client.c b/citadel/event_client.c index ba18107d7..8484cad4c 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -70,25 +70,33 @@ extern int event_add_pipe[2]; extern citthread_mutex_t EventQueueMutex; -extern void *QueueEventAddPtr; -extern AsyncIO *QueueThisIO; -extern EventContextAttach EventContextAttachPtr; +extern HashList *InboundEventQueue; +#define SEND_EVENT 1 +#define RECV_EVENT 2 + int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) { + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->Ctx = Ctx; + h->EvAttch = CB; + citthread_mutex_lock(&EventQueueMutex); if (event_add_pipe[1] == -1) { citthread_mutex_unlock(&EventQueueMutex); - + free (h); return -1; } - - QueueEventAddPtr = Ctx; - EventContextAttachPtr = CB; - QueueThisIO = IO; + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); + i = GetCount(InboundEventQueue); + Put(InboundEventQueue, IKEY(i), h, NULL); + citthread_mutex_unlock(&EventQueueMutex); write(event_add_pipe[1], "+_", 1); - citthread_mutex_unlock(&EventQueueMutex); + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n"); return 0; } @@ -117,29 +125,40 @@ void FreeAsyncIOContents(AsyncIO *IO) } /* -static void -setup_signal_handlers(struct instance *instance) -{ - signal(SIGPIPE, SIG_IGN); - - event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigterm_event, NULL); - - event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigint_event, NULL); - - event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigquit_event, NULL); -} + static void + setup_signal_handlers(struct instance *instance) + { + signal(SIGPIPE, SIG_IGN); + + event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigterm_event, NULL); + + event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigint_event, NULL); + + event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, + exit_event_callback, instance); + event_add(&instance->sigquit_event, NULL); + } */ void ShutDownCLient(AsyncIO *IO) { - event_del(&IO->send_event); - event_del(&IO->recv_event); + CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock); + switch (IO->active_event) { + case SEND_EVENT: + event_del(&IO->send_event); + break; + case RECV_EVENT: + event_del(&IO->recv_event); + break; + case 0: + // no event active here; just bail out. + break; + } + IO->active_event = 0; IO->Terminate(IO->Data); // citthread_mutex_lock(&EventQueueMutex); @@ -180,6 +199,7 @@ eReadState HandleInbound(AsyncIO *IO) if (Finished != eMustReadMore) { event_del(&IO->recv_event); + IO->active_event = 0; IO->NextState = IO->ReadDone(IO->Data); Finished = StrBufCheckBuffer(&IO->RecvBuf); } @@ -191,6 +211,7 @@ eReadState HandleInbound(AsyncIO *IO) { IO->NextState = IO->SendDone(IO->Data); event_add(&IO->send_event, NULL); + IO->active_event = SEND_EVENT; } else if ((IO->NextState == eTerminateConnection) || @@ -208,41 +229,77 @@ IO_send_callback(int fd, short event, void *ctx) (void)fd; (void)event; - + CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", + (int) fd, + (event&EV_TIMEOUT) ? " timeout" : "", + (event&EV_READ) ? " read" : "", + (event&EV_WRITE) ? " write" : "", + (event&EV_SIGNAL) ? " signal" : ""); + /// assert(fd == IO->sock); rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf); if (rc == 0) { - event_del(&IO->send_event); - switch (IO->NextState) { - case eSendReply: - break; - case eSendMore: - IO->NextState = IO->SendDone(IO->Data); - - if ((IO->NextState == eTerminateConnection) || - (IO->NextState == eAbort) ) - ShutDownCLient(IO); - else - event_add(&IO->send_event, NULL); - break; - case eReadMessage: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { - HandleInbound(IO); - } - else { - event_add(&IO->recv_event, NULL); - } - - break; - case eAbort: - break; - } + +#ifdef BIGBAD_IODBG + { + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->SendBuf.Buf); + const char *pchh = IO->SendBuf.ReadWritePointer; + long nbytes; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock); + + fd = fopen(fn, "a+"); + fprintf(fd, "Read: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + fprintf(fd, "]\n"); + + + fclose(fd); + } +#endif + event_del(&IO->send_event); + IO->active_event = 0; + switch (IO->NextState) { + case eSendReply: + break; + case eSendMore: + IO->NextState = IO->SendDone(IO->Data); + + if ((IO->NextState == eTerminateConnection) || + (IO->NextState == eAbort) ) + ShutDownCLient(IO); + else { + event_add(&IO->send_event, NULL); + IO->active_event = SEND_EVENT; + } + break; + case eReadMessage: + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + HandleInbound(IO); + } + else { + event_add(&IO->recv_event, NULL); + IO->active_event = RECV_EVENT; + } + + break; + case eAbort: + break; + } } else if (rc > 0) - return; + ShutDownCLient(IO); // else ///abort! } @@ -258,11 +315,18 @@ IO_recv_callback(int fd, short event, void *ctx) // assert(fd == sb->fd); + CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n", + (int) fd, + (event&EV_TIMEOUT) ? " timeout" : "", + (event&EV_READ) ? " read" : "", + (event&EV_WRITE) ? " write" : "", + (event&EV_SIGNAL) ? " signal" : ""); nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf); if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - /// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); + ShutDownCLient(IO); +/// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); seems as if socket is gone then? return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); @@ -270,15 +334,6 @@ IO_recv_callback(int fd, short event, void *ctx) } } -void IOReadNextLine(AsyncIO *IO, int timeout) -{ - -} - -void IOReadNextBLOB(AsyncIO *IO, int timeout, long size) -{ -} - void InitEventIO(AsyncIO *IO, void *pData, IO_CallBack ReadDone, @@ -308,6 +363,7 @@ void InitEventIO(AsyncIO *IO, if (ReadFirst) { IO->NextState = eReadMessage; event_add(&IO->recv_event, NULL); + IO->active_event = RECV_EVENT; } else { IO->NextState = eSendReply; diff --git a/citadel/event_client.h b/citadel/event_client.h index f20152f8d..34023a3c0 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -16,6 +16,7 @@ typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO); struct AsyncIO { int sock; + int active_event; struct event recv_event, send_event; IOBuffer SendBuf, RecvBuf; IO_LineReaderCallback LineReader; @@ -26,6 +27,11 @@ struct AsyncIO { eNextState NextState; }; +typedef struct _IOAddHandler { + void *Ctx; + EventContextAttach EvAttch; +}IOAddHandler; + void FreeAsyncIOContents(AsyncIO *IO); int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB); diff --git a/citadel/modules/eventclient/serv_evventclient.c b/citadel/modules/eventclient/serv_eventclient.c similarity index 73% rename from citadel/modules/eventclient/serv_evventclient.c rename to citadel/modules/eventclient/serv_eventclient.c index bb13c9dee..68a45c725 100644 --- a/citadel/modules/eventclient/serv_evventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -57,26 +57,50 @@ #include "event_client.h" -int event_add_pipe[2]; +int event_add_pipe[2] = {-1, -1}; citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ -void *QueueEventAddPtr = NULL; -EventContextAttach EventContextAttachPtr; -AsyncIO *QueueThisIO; HashList *QueueEvents = NULL; +HashList *InboundEventQueue = NULL; +HashList *InboundEventQueues[2] = { NULL, NULL }; + static struct event_base *event_base; struct event queue_add_event; static void QueueEventAddCallback(int fd, short event, void *ctx) { char buf[10]; + HashList *q; + void *v; + HashPos *It; + long len; + const char *Key; /* get the control command... */ read(fd, buf, 1); switch (buf[0]) { case '+': - EventContextAttachPtr(QueueEventAddPtr); + citthread_mutex_lock(&EventQueueMutex); + + if (InboundEventQueues[0] == InboundEventQueue) { + InboundEventQueue = InboundEventQueues[1]; + q = InboundEventQueues[0]; + } + else { + InboundEventQueue = InboundEventQueues[0]; + q = InboundEventQueues[1]; + } + citthread_mutex_unlock(&EventQueueMutex); + + It = GetNewHashPos(q, 0); + while (GetNextHashPos(q, It, &len, &Key, &v)) + { + IOAddHandler *h = v; + h->EvAttch(h->Ctx); + } + DeleteHashPos(&It); + DeleteHashContent(&q); /// TODO: add it to QueueEvents break; case 'x': @@ -86,32 +110,48 @@ static void QueueEventAddCallback(int fd, short event, void *ctx) event_base_loopexit(event_base, NULL); } /* Unblock the other side */ - read(fd, buf, 1); +// read(fd, buf, 1); + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n"); } -/* - * this thread operates the select() etc. via libevent. - * - * - */ -void *client_event_thread(void *arg) { - struct CitContext libevent_client_CC; + +void InitEventQueue(void) +{ + struct rlimit LimitSet; + event_base = event_init(); /* base = event_base_new(); if (!base) - return NULL; /*XXXerr*/ + return NULL; / *XXXerr*/ citthread_mutex_init(&EventQueueMutex, NULL); - CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n"); - + if (pipe(event_add_pipe) != 0) { CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno)); abort(); } - + LimitSet.rlim_cur = 1; + LimitSet.rlim_max = 1; + setrlimit(event_add_pipe[1], &LimitSet); + + QueueEvents = NewHash(1, Flathash); + InboundEventQueues[0] = NewHash(1, Flathash); + InboundEventQueues[1] = NewHash(1, Flathash); + InboundEventQueue = InboundEventQueues[0]; +} +/* + * this thread operates the select() etc. via libevent. + * + * + */ +void *client_event_thread(void *arg) +{ + struct CitContext libevent_client_CC; + CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread"); +// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n"); + event_set(&queue_add_event, event_add_pipe[0], EV_READ|EV_PERSIST, @@ -135,8 +175,8 @@ CTDL_MODULE_INIT(event_client) #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT if (!threading) { + InitEventQueue(); CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL); - QueueEvents = NewHash(1, Flathash); /// todo register shutdown callback. } #endif diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index 5082d94b4..2904e297f 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -577,7 +577,7 @@ void resolve_mx_hosts(SmtpOutMsg *SendMsg) } } -/* TODO: abort... */ + #define SMTP_ERROR(WHICH_ERR, ERRSTR) {SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); return eAbort; } #define SMTP_VERROR(WHICH_ERR) { SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); return eAbort; } #define SMTP_IS_STATE(WHICH_STATE) (ChrPtr(SendMsg->IO.IOBuf)[0] == WHICH_STATE) @@ -623,6 +623,10 @@ void connect_one_smtpsrv(SmtpOutMsg *SendMsg) int connect_one_smtpsrv_xamine_result(void *Ctx) { SmtpOutMsg *SendMsg = Ctx; + + CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connecting [%s:%s]!\n", + SendMsg->n, SendMsg->mx_host, SendMsg->mx_port); + SendMsg->IO.SendBuf.fd = SendMsg->IO.RecvBuf.fd = SendMsg->IO.sock = sock_connect(SendMsg->mx_host, SendMsg->mx_port); @@ -631,7 +635,7 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) "Could not connect: %s", strerror(errno)); if (SendMsg->IO.sock >= 0) { - CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connected!\n", SendMsg->n); + CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld:%ld]: connected!\n", SendMsg->n, SendMsg->IO.sock); int fdflags; fdflags = fcntl(SendMsg->IO.sock, F_GETFL); if (fdflags < 0) @@ -663,8 +667,8 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) } - SendMsg->IO.SendBuf.Buf = NewStrBuf(); - SendMsg->IO.RecvBuf.Buf = NewStrBuf(); + SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); + SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); SendMsg->IO.IOBuf = NewStrBuf(); InitEventIO(&SendMsg->IO, SendMsg, SMTP_C_DispatchReadDone, @@ -1047,12 +1051,12 @@ void smtp_do_procmsg(long msgnum, void *userdata) { long len; const char *Key; - CtdlLogPrintf(CTDL_DEBUG, "SMTP client: smtp_do_procmsg(%ld)\n", msgnum); + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum); ///strcpy(envelope_from, ""); msg = CtdlFetchMessage(msgnum, 1); if (msg == NULL) { - CtdlLogPrintf(CTDL_ERR, "SMTP client: tried %ld but no such message!\n", msgnum); + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: tried %ld but no such message!\n", msgnum); return; } @@ -1072,7 +1076,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) { FreeStrBuf(&PlainQItem); if (MyQItem == NULL) { - CtdlLogPrintf(CTDL_ERR, "SMTP client: Msg No %ld: already in progress!\n", msgnum); + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: Msg No %ld: already in progress!\n", msgnum); return; /* s.b. else is already processing... */ } @@ -1098,7 +1102,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) { * Bail out if there's no actual message associated with this */ if (MyQItem->MessageID < 0L) { - CtdlLogPrintf(CTDL_ERR, "SMTP client: no 'msgid' directive found!\n"); + CtdlLogPrintf(CTDL_ERR, "SMTP Queue: no 'msgid' directive found!\n"); It = GetNewHashPos(MyQItem->MailQEntries, 0); citthread_mutex_lock(&ActiveQItemsLock); { @@ -1111,6 +1115,14 @@ void smtp_do_procmsg(long msgnum, void *userdata) { return; } + It = GetNewHashPos(MyQItem->MailQEntries, 0); + while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE)) + { + MailQEntry *ThisItem = vQE; + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Task: <%s> %d\n", ChrPtr(ThisItem->Recipient), ThisItem->Active); + } + DeleteHashPos(&It); + CountActiveQueueEntries(MyQItem); if (MyQItem->ActiveDeliveries > 0) { @@ -1122,7 +1134,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) { { MailQEntry *ThisItem = vQE; if (ThisItem->Active == 1) { - CtdlLogPrintf(CTDL_DEBUG, "SMTP client: Trying <%s>\n", ChrPtr(ThisItem->Recipient)); + CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient)); smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries)); i++; } @@ -1197,6 +1209,8 @@ void *smtp_queue_thread(void *arg) { int num_processed = 0; struct CitContext smtp_queue_CC; + CtdlThreadSleep(10); + CtdlFillSystemContext(&smtp_queue_CC, "SMTP Send"); citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); CtdlLogPrintf(CTDL_DEBUG, "smtp_queue_thread() initializing\n"); -- 2.30.2