extern int event_add_pipe[2];
extern citthread_mutex_t EventQueueMutex;
-extern void *QueueEventAddPtr;
-extern AsyncIO *QueueThisIO;
-extern EventContextAttach EventContextAttachPtr;
+extern HashList *InboundEventQueue;
+#define SEND_EVENT 1
+#define RECV_EVENT 2
+
int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
{
+ IOAddHandler *h;
+ int i;
+
+ h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
+ h->Ctx = Ctx;
+ h->EvAttch = CB;
+
citthread_mutex_lock(&EventQueueMutex);
if (event_add_pipe[1] == -1) {
citthread_mutex_unlock(&EventQueueMutex);
-
+ free (h);
return -1;
}
-
- QueueEventAddPtr = Ctx;
- EventContextAttachPtr = CB;
- QueueThisIO = IO;
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
+ i = GetCount(InboundEventQueue);
+ Put(InboundEventQueue, IKEY(i), h, NULL);
+ citthread_mutex_unlock(&EventQueueMutex);
write(event_add_pipe[1], "+_", 1);
- citthread_mutex_unlock(&EventQueueMutex);
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
return 0;
}
}
/*
-static void
-setup_signal_handlers(struct instance *instance)
-{
- signal(SIGPIPE, SIG_IGN);
-
- event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
- exit_event_callback, instance);
- event_add(&instance->sigterm_event, NULL);
-
- event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
- exit_event_callback, instance);
- event_add(&instance->sigint_event, NULL);
-
- event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
- exit_event_callback, instance);
- event_add(&instance->sigquit_event, NULL);
-}
+ static void
+ setup_signal_handlers(struct instance *instance)
+ {
+ signal(SIGPIPE, SIG_IGN);
+
+ event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
+ exit_event_callback, instance);
+ event_add(&instance->sigterm_event, NULL);
+
+ event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
+ exit_event_callback, instance);
+ event_add(&instance->sigint_event, NULL);
+
+ event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
+ exit_event_callback, instance);
+ event_add(&instance->sigquit_event, NULL);
+ }
*/
void ShutDownCLient(AsyncIO *IO)
{
- event_del(&IO->send_event);
- event_del(&IO->recv_event);
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
+ switch (IO->active_event) {
+ case SEND_EVENT:
+ event_del(&IO->send_event);
+ break;
+ case RECV_EVENT:
+ event_del(&IO->recv_event);
+ break;
+ case 0:
+ // no event active here; just bail out.
+ break;
+ }
+ IO->active_event = 0;
IO->Terminate(IO->Data);
// citthread_mutex_lock(&EventQueueMutex);
if (Finished != eMustReadMore) {
event_del(&IO->recv_event);
+ IO->active_event = 0;
IO->NextState = IO->ReadDone(IO->Data);
Finished = StrBufCheckBuffer(&IO->RecvBuf);
}
{
IO->NextState = IO->SendDone(IO->Data);
event_add(&IO->send_event, NULL);
+ IO->active_event = SEND_EVENT;
}
else if ((IO->NextState == eTerminateConnection) ||
(void)fd;
(void)event;
-
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n",
+ (int) fd,
+ (event&EV_TIMEOUT) ? " timeout" : "",
+ (event&EV_READ) ? " read" : "",
+ (event&EV_WRITE) ? " write" : "",
+ (event&EV_SIGNAL) ? " signal" : "");
+
/// assert(fd == IO->sock);
rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf);
if (rc == 0)
{
- event_del(&IO->send_event);
- switch (IO->NextState) {
- case eSendReply:
- break;
- case eSendMore:
- IO->NextState = IO->SendDone(IO->Data);
-
- if ((IO->NextState == eTerminateConnection) ||
- (IO->NextState == eAbort) )
- ShutDownCLient(IO);
- else
- event_add(&IO->send_event, NULL);
- break;
- case eReadMessage:
- if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
- HandleInbound(IO);
- }
- else {
- event_add(&IO->recv_event, NULL);
- }
-
- break;
- case eAbort:
- break;
- }
+
+#ifdef BIGBAD_IODBG
+ {
+ int rv = 0;
+ char fn [SIZ];
+ FILE *fd;
+ const char *pch = ChrPtr(IO->SendBuf.Buf);
+ const char *pchh = IO->SendBuf.ReadWritePointer;
+ long nbytes;
+
+ if (pchh == NULL)
+ pchh = pch;
+
+ nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
+ snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock);
+
+ fd = fopen(fn, "a+");
+ fprintf(fd, "Read: BufSize: %ld BufContent: [",
+ nbytes);
+ rv = fwrite(pchh, nbytes, 1, fd);
+ fprintf(fd, "]\n");
+
+
+ fclose(fd);
+ }
+#endif
+ event_del(&IO->send_event);
+ IO->active_event = 0;
+ switch (IO->NextState) {
+ case eSendReply:
+ break;
+ case eSendMore:
+ IO->NextState = IO->SendDone(IO->Data);
+
+ if ((IO->NextState == eTerminateConnection) ||
+ (IO->NextState == eAbort) )
+ ShutDownCLient(IO);
+ else {
+ event_add(&IO->send_event, NULL);
+ IO->active_event = SEND_EVENT;
+ }
+ break;
+ case eReadMessage:
+ if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
+ HandleInbound(IO);
+ }
+ else {
+ event_add(&IO->recv_event, NULL);
+ IO->active_event = RECV_EVENT;
+ }
+
+ break;
+ case eAbort:
+ break;
+ }
}
else if (rc > 0)
- return;
+ ShutDownCLient(IO);
// else
///abort!
}
// assert(fd == sb->fd);
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n",
+ (int) fd,
+ (event&EV_TIMEOUT) ? " timeout" : "",
+ (event&EV_READ) ? " read" : "",
+ (event&EV_WRITE) ? " write" : "",
+ (event&EV_SIGNAL) ? " signal" : "");
nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf);
if (nbytes > 0) {
HandleInbound(IO);
} else if (nbytes == 0) {
- /// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0);
+ ShutDownCLient(IO);
+/// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); seems as if socket is gone then?
return;
} else if (nbytes == -1) {
/// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno);
}
}
-void IOReadNextLine(AsyncIO *IO, int timeout)
-{
-
-}
-
-void IOReadNextBLOB(AsyncIO *IO, int timeout, long size)
-{
-}
-
void InitEventIO(AsyncIO *IO,
void *pData,
IO_CallBack ReadDone,
if (ReadFirst) {
IO->NextState = eReadMessage;
event_add(&IO->recv_event, NULL);
+ IO->active_event = RECV_EVENT;
}
else {
IO->NextState = eSendReply;
struct AsyncIO {
int sock;
+ int active_event;
struct event recv_event, send_event;
IOBuffer SendBuf, RecvBuf;
IO_LineReaderCallback LineReader;
eNextState NextState;
};
+typedef struct _IOAddHandler {
+ void *Ctx;
+ EventContextAttach EvAttch;
+}IOAddHandler;
+
void FreeAsyncIOContents(AsyncIO *IO);
int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB);
--- /dev/null
+/*
+ * Copyright (c) 1998-2009 by the citadel.org team
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include "sysdep.h"
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <termios.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <pwd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <syslog.h>
+
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+# include <sys/time.h>
+# else
+# include <time.h>
+# endif
+#endif
+#include <sys/wait.h>
+#include <ctype.h>
+#include <string.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <libcitadel.h>
+#include "citadel.h"
+#include "server.h"
+#include "citserver.h"
+#include "support.h"
+
+#include "ctdl_module.h"
+
+#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
+
+#include "event_client.h"
+
+int event_add_pipe[2] = {-1, -1};
+
+citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
+HashList *QueueEvents = NULL;
+
+HashList *InboundEventQueue = NULL;
+HashList *InboundEventQueues[2] = { NULL, NULL };
+
+static struct event_base *event_base;
+struct event queue_add_event;
+
+static void QueueEventAddCallback(int fd, short event, void *ctx)
+{
+ char buf[10];
+ HashList *q;
+ void *v;
+ HashPos *It;
+ long len;
+ const char *Key;
+
+ /* get the control command... */
+ read(fd, buf, 1);
+ switch (buf[0]) {
+ case '+':
+ citthread_mutex_lock(&EventQueueMutex);
+
+ if (InboundEventQueues[0] == InboundEventQueue) {
+ InboundEventQueue = InboundEventQueues[1];
+ q = InboundEventQueues[0];
+ }
+ else {
+ InboundEventQueue = InboundEventQueues[0];
+ q = InboundEventQueues[1];
+ }
+ citthread_mutex_unlock(&EventQueueMutex);
+
+ It = GetNewHashPos(q, 0);
+ while (GetNextHashPos(q, It, &len, &Key, &v))
+ {
+ IOAddHandler *h = v;
+ h->EvAttch(h->Ctx);
+ }
+ DeleteHashPos(&It);
+ DeleteHashContent(&q);
+/// TODO: add it to QueueEvents
+ break;
+ case 'x':
+ event_del(&queue_add_event);
+ close(event_add_pipe[0]);
+/// TODO; flush QueueEvents fd's and delete it.
+ event_base_loopexit(event_base, NULL);
+ }
+ /* Unblock the other side */
+// read(fd, buf, 1);
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
+}
+
+
+void InitEventQueue(void)
+{
+ struct rlimit LimitSet;
+
+ event_base = event_init();
+/*
+ base = event_base_new();
+ if (!base)
+ return NULL; / *XXXerr*/
+
+ citthread_mutex_init(&EventQueueMutex, NULL);
+
+ if (pipe(event_add_pipe) != 0) {
+ CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno));
+ abort();
+ }
+ LimitSet.rlim_cur = 1;
+ LimitSet.rlim_max = 1;
+ setrlimit(event_add_pipe[1], &LimitSet);
+
+ QueueEvents = NewHash(1, Flathash);
+ InboundEventQueues[0] = NewHash(1, Flathash);
+ InboundEventQueues[1] = NewHash(1, Flathash);
+ InboundEventQueue = InboundEventQueues[0];
+}
+/*
+ * this thread operates the select() etc. via libevent.
+ *
+ *
+ */
+void *client_event_thread(void *arg)
+{
+ struct CitContext libevent_client_CC;
+ CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
+// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+ CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
+
+ event_set(&queue_add_event,
+ event_add_pipe[0],
+ EV_READ|EV_PERSIST,
+ QueueEventAddCallback,
+ NULL);
+
+ event_add(&queue_add_event, NULL);
+
+
+ event_dispatch();
+ CtdlClearSystemContext();
+ event_base_free(event_base);
+ citthread_mutex_destroy(&EventQueueMutex);
+ return(NULL);
+}
+
+#endif
+
+CTDL_MODULE_INIT(event_client)
+{
+#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
+ if (!threading)
+ {
+ InitEventQueue();
+ CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
+/// todo register shutdown callback.
+ }
+#endif
+ return "event";
+}
+++ /dev/null
-/*
- * Copyright (c) 1998-2009 by the citadel.org team
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-
-#include "sysdep.h"
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <termios.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <pwd.h>
-#include <errno.h>
-#include <sys/types.h>
-#include <syslog.h>
-
-#if TIME_WITH_SYS_TIME
-# include <sys/time.h>
-# include <time.h>
-#else
-# if HAVE_SYS_TIME_H
-# include <sys/time.h>
-# else
-# include <time.h>
-# endif
-#endif
-#include <sys/wait.h>
-#include <ctype.h>
-#include <string.h>
-#include <limits.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <libcitadel.h>
-#include "citadel.h"
-#include "server.h"
-#include "citserver.h"
-#include "support.h"
-
-#include "ctdl_module.h"
-
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
-
-#include "event_client.h"
-
-int event_add_pipe[2];
-
-citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
-void *QueueEventAddPtr = NULL;
-EventContextAttach EventContextAttachPtr;
-AsyncIO *QueueThisIO;
-HashList *QueueEvents = NULL;
-
-static struct event_base *event_base;
-struct event queue_add_event;
-
-static void QueueEventAddCallback(int fd, short event, void *ctx)
-{
- char buf[10];
-
- /* get the control command... */
- read(fd, buf, 1);
- switch (buf[0]) {
- case '+':
- EventContextAttachPtr(QueueEventAddPtr);
-/// TODO: add it to QueueEvents
- break;
- case 'x':
- event_del(&queue_add_event);
- close(event_add_pipe[0]);
-/// TODO; flush QueueEvents fd's and delete it.
- event_base_loopexit(event_base, NULL);
- }
- /* Unblock the other side */
- read(fd, buf, 1);
-}
-
-/*
- * this thread operates the select() etc. via libevent.
- *
- *
- */
-void *client_event_thread(void *arg) {
- struct CitContext libevent_client_CC;
- event_base = event_init();
-/*
- base = event_base_new();
- if (!base)
- return NULL; /*XXXerr*/
-
- citthread_mutex_init(&EventQueueMutex, NULL);
- CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
-// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
- CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
-
- if (pipe(event_add_pipe) != 0) {
- CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno));
- abort();
- }
-
- event_set(&queue_add_event,
- event_add_pipe[0],
- EV_READ|EV_PERSIST,
- QueueEventAddCallback,
- NULL);
-
- event_add(&queue_add_event, NULL);
-
-
- event_dispatch();
- CtdlClearSystemContext();
- event_base_free(event_base);
- citthread_mutex_destroy(&EventQueueMutex);
- return(NULL);
-}
-
-#endif
-
-CTDL_MODULE_INIT(event_client)
-{
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
- if (!threading)
- {
- CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
- QueueEvents = NewHash(1, Flathash);
-/// todo register shutdown callback.
- }
-#endif
- return "event";
-}
}
}
-/* TODO: abort... */
+
#define SMTP_ERROR(WHICH_ERR, ERRSTR) {SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); return eAbort; }
#define SMTP_VERROR(WHICH_ERR) { SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); return eAbort; }
#define SMTP_IS_STATE(WHICH_STATE) (ChrPtr(SendMsg->IO.IOBuf)[0] == WHICH_STATE)
int connect_one_smtpsrv_xamine_result(void *Ctx)
{
SmtpOutMsg *SendMsg = Ctx;
+
+ CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connecting [%s:%s]!\n",
+ SendMsg->n, SendMsg->mx_host, SendMsg->mx_port);
+
SendMsg->IO.SendBuf.fd =
SendMsg->IO.RecvBuf.fd =
SendMsg->IO.sock = sock_connect(SendMsg->mx_host, SendMsg->mx_port);
"Could not connect: %s", strerror(errno));
if (SendMsg->IO.sock >= 0)
{
- CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connected!\n", SendMsg->n);
+ CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld:%ld]: connected!\n", SendMsg->n, SendMsg->IO.sock);
int fdflags;
fdflags = fcntl(SendMsg->IO.sock, F_GETFL);
if (fdflags < 0)
}
- SendMsg->IO.SendBuf.Buf = NewStrBuf();
- SendMsg->IO.RecvBuf.Buf = NewStrBuf();
+ SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024);
+ SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
SendMsg->IO.IOBuf = NewStrBuf();
InitEventIO(&SendMsg->IO, SendMsg,
SMTP_C_DispatchReadDone,
long len;
const char *Key;
- CtdlLogPrintf(CTDL_DEBUG, "SMTP client: smtp_do_procmsg(%ld)\n", msgnum);
+ CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
///strcpy(envelope_from, "");
msg = CtdlFetchMessage(msgnum, 1);
if (msg == NULL) {
- CtdlLogPrintf(CTDL_ERR, "SMTP client: tried %ld but no such message!\n", msgnum);
+ CtdlLogPrintf(CTDL_ERR, "SMTP Queue: tried %ld but no such message!\n", msgnum);
return;
}
FreeStrBuf(&PlainQItem);
if (MyQItem == NULL) {
- CtdlLogPrintf(CTDL_ERR, "SMTP client: Msg No %ld: already in progress!\n", msgnum);
+ CtdlLogPrintf(CTDL_ERR, "SMTP Queue: Msg No %ld: already in progress!\n", msgnum);
return; /* s.b. else is already processing... */
}
* Bail out if there's no actual message associated with this
*/
if (MyQItem->MessageID < 0L) {
- CtdlLogPrintf(CTDL_ERR, "SMTP client: no 'msgid' directive found!\n");
+ CtdlLogPrintf(CTDL_ERR, "SMTP Queue: no 'msgid' directive found!\n");
It = GetNewHashPos(MyQItem->MailQEntries, 0);
citthread_mutex_lock(&ActiveQItemsLock);
{
return;
}
+ It = GetNewHashPos(MyQItem->MailQEntries, 0);
+ while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
+ {
+ MailQEntry *ThisItem = vQE;
+ CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Task: <%s> %d\n", ChrPtr(ThisItem->Recipient), ThisItem->Active);
+ }
+ DeleteHashPos(&It);
+
CountActiveQueueEntries(MyQItem);
if (MyQItem->ActiveDeliveries > 0)
{
{
MailQEntry *ThisItem = vQE;
if (ThisItem->Active == 1) {
- CtdlLogPrintf(CTDL_DEBUG, "SMTP client: Trying <%s>\n", ChrPtr(ThisItem->Recipient));
+ CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient));
smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries));
i++;
}
int num_processed = 0;
struct CitContext smtp_queue_CC;
+ CtdlThreadSleep(10);
+
CtdlFillSystemContext(&smtp_queue_CC, "SMTP Send");
citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
CtdlLogPrintf(CTDL_DEBUG, "smtp_queue_thread() initializing\n");