Libevent Migration
authorWilfried Goesgens <dothebart@citadel.org>
Mon, 27 Dec 2010 15:45:53 +0000 (16:45 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Mon, 27 Dec 2010 15:45:53 +0000 (16:45 +0100)
  - use a bigger read buffer for the smtp client
  - differentiate between queue and smtp client in log messages
  - migrate scheduling of jobs to a locked list

citadel/event_client.c
citadel/event_client.h
citadel/modules/eventclient/serv_eventclient.c [new file with mode: 0644]
citadel/modules/eventclient/serv_evventclient.c [deleted file]
citadel/modules/smtp/serv_smtpeventclient.c

index ba18107d76c1307c46ab7754c4c39796eb28dc02..8484cad4cfb9437e52682fcebce43655755e4bfe 100644 (file)
 
 extern int event_add_pipe[2];
 extern citthread_mutex_t EventQueueMutex;
-extern void *QueueEventAddPtr;
-extern AsyncIO *QueueThisIO;
-extern EventContextAttach EventContextAttachPtr;
+extern HashList *InboundEventQueue;
 
+#define SEND_EVENT 1
+#define RECV_EVENT 2
+       
 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
 {
+       IOAddHandler *h;
+       int i;
+
+       h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
+       h->Ctx = Ctx;
+       h->EvAttch = CB;
+
        citthread_mutex_lock(&EventQueueMutex);
        if (event_add_pipe[1] == -1) {
                citthread_mutex_unlock(&EventQueueMutex);
-
+               free (h);
                return -1;
        }
-
-       QueueEventAddPtr = Ctx;
-       EventContextAttachPtr = CB;
-       QueueThisIO = IO;
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
+       i = GetCount(InboundEventQueue);
+       Put(InboundEventQueue, IKEY(i), h, NULL);
+       citthread_mutex_unlock(&EventQueueMutex);
 
        write(event_add_pipe[1], "+_", 1);
-       citthread_mutex_unlock(&EventQueueMutex);
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
        return 0;
 }
 
@@ -117,29 +125,40 @@ void FreeAsyncIOContents(AsyncIO *IO)
 }
 
 /*
-static void
-setup_signal_handlers(struct instance *instance)
-{
-    signal(SIGPIPE, SIG_IGN);
-
-    event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
-              exit_event_callback, instance);
-    event_add(&instance->sigterm_event, NULL);
-
-    event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
-              exit_event_callback, instance);
-    event_add(&instance->sigint_event, NULL);
-
-    event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
-              exit_event_callback, instance);
-    event_add(&instance->sigquit_event, NULL);
-}
+  static void
+  setup_signal_handlers(struct instance *instance)
+  {
+  signal(SIGPIPE, SIG_IGN);
+
+  event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
+  exit_event_callback, instance);
+  event_add(&instance->sigterm_event, NULL);
+
+  event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
+  exit_event_callback, instance);
+  event_add(&instance->sigint_event, NULL);
+
+  event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
+  exit_event_callback, instance);
+  event_add(&instance->sigquit_event, NULL);
+  }
 */
 
 void ShutDownCLient(AsyncIO *IO)
 {
-       event_del(&IO->send_event);
-       event_del(&IO->recv_event);
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
+       switch (IO->active_event) {
+       case SEND_EVENT:
+               event_del(&IO->send_event);
+               break;
+       case RECV_EVENT:
+               event_del(&IO->recv_event);
+               break;
+       case 0:
+               // no event active here; just bail out.
+               break;
+       }
+       IO->active_event = 0;
        IO->Terminate(IO->Data);
 
 //     citthread_mutex_lock(&EventQueueMutex);
@@ -180,6 +199,7 @@ eReadState HandleInbound(AsyncIO *IO)
                        
                if (Finished != eMustReadMore) {
                        event_del(&IO->recv_event);
+                       IO->active_event = 0;
                        IO->NextState = IO->ReadDone(IO->Data);
                        Finished = StrBufCheckBuffer(&IO->RecvBuf);
                }
@@ -191,6 +211,7 @@ eReadState HandleInbound(AsyncIO *IO)
        {
                IO->NextState = IO->SendDone(IO->Data);
                event_add(&IO->send_event, NULL);
+               IO->active_event = SEND_EVENT;
                        
        }
        else if ((IO->NextState == eTerminateConnection) ||
@@ -208,41 +229,77 @@ IO_send_callback(int fd, short event, void *ctx)
 
        (void)fd;
        (void)event;
-       
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d  : [%s%s%s%s]\n",
+                     (int) fd,
+                     (event&EV_TIMEOUT) ? " timeout" : "",
+                     (event&EV_READ)    ? " read" : "",
+                     (event&EV_WRITE)   ? " write" : "",
+                     (event&EV_SIGNAL)  ? " signal" : "");
+
 ///    assert(fd == IO->sock);
        
        rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf);
 
        if (rc == 0)
        {
-           event_del(&IO->send_event);
-           switch (IO->NextState) {
-           case eSendReply:
-                   break;
-           case eSendMore:
-                   IO->NextState = IO->SendDone(IO->Data);
-
-                   if ((IO->NextState == eTerminateConnection) ||
-                            (IO->NextState == eAbort) )
-                           ShutDownCLient(IO);
-                   else
-                           event_add(&IO->send_event, NULL);
-                   break;
-           case eReadMessage:
-                   if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
-                           HandleInbound(IO);
-                   }
-                   else {
-                           event_add(&IO->recv_event, NULL);
-                   }
-
-                   break;
-           case eAbort:
-                   break;
-           }
+               
+#ifdef BIGBAD_IODBG
+               {
+                       int rv = 0;
+                       char fn [SIZ];
+                       FILE *fd;
+                       const char *pch = ChrPtr(IO->SendBuf.Buf);
+                       const char *pchh = IO->SendBuf.ReadWritePointer;
+                       long nbytes;
+
+                       if (pchh == NULL)
+                               pchh = pch;
+                       
+                       nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
+                       snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock);
+               
+                       fd = fopen(fn, "a+");
+                       fprintf(fd, "Read: BufSize: %ld BufContent: [",
+                               nbytes);
+                       rv = fwrite(pchh, nbytes, 1, fd);
+                       fprintf(fd, "]\n");
+               
+                       
+                       fclose(fd);
+               }
+#endif
+               event_del(&IO->send_event);
+               IO->active_event = 0;
+               switch (IO->NextState) {
+               case eSendReply:
+                       break;
+               case eSendMore:
+                       IO->NextState = IO->SendDone(IO->Data);
+
+                       if ((IO->NextState == eTerminateConnection) ||
+                           (IO->NextState == eAbort) )
+                               ShutDownCLient(IO);
+                       else {
+                               event_add(&IO->send_event, NULL);
+                               IO->active_event = SEND_EVENT;
+                       }
+                       break;
+               case eReadMessage:
+                       if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
+                               HandleInbound(IO);
+                       }
+                       else {
+                               event_add(&IO->recv_event, NULL);
+                               IO->active_event = RECV_EVENT;
+                       }
+
+                       break;
+               case eAbort:
+                       break;
+               }
        }
        else if (rc > 0)
-               return;
+               ShutDownCLient(IO);
 //     else 
                ///abort!
 }
@@ -258,11 +315,18 @@ IO_recv_callback(int fd, short event, void *ctx)
        
 //    assert(fd == sb->fd);
 
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d  : [%s%s%s%s]\n",
+            (int) fd,
+            (event&EV_TIMEOUT) ? " timeout" : "",
+            (event&EV_READ)    ? " read" : "",
+            (event&EV_WRITE)   ? " write" : "",
+            (event&EV_SIGNAL)  ? " signal" : "");
        nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf);
        if (nbytes > 0) {
                HandleInbound(IO);
        } else if (nbytes == 0) {
-               ///  TODO: this is a timeout???  sock_buff_invoke_free(sb, 0);
+               ShutDownCLient(IO);
+///  TODO: this is a timeout???  sock_buff_invoke_free(sb, 0); seems as if socket is gone then?
                return;
        } else if (nbytes == -1) {
 /// TODO: FD is gone. kick it.        sock_buff_invoke_free(sb, errno);
@@ -270,15 +334,6 @@ IO_recv_callback(int fd, short event, void *ctx)
        }
 }
 
-void IOReadNextLine(AsyncIO *IO, int timeout)
-{
-
-}
-
-void IOReadNextBLOB(AsyncIO *IO, int timeout, long size)
-{
-}
-
 void InitEventIO(AsyncIO *IO, 
                 void *pData, 
                 IO_CallBack ReadDone, 
@@ -308,6 +363,7 @@ void InitEventIO(AsyncIO *IO,
        if (ReadFirst) {
                IO->NextState = eReadMessage;
                event_add(&IO->recv_event, NULL);
+               IO->active_event = RECV_EVENT;
        }
        else {
                IO->NextState = eSendReply;
index f20152f8d933319477d08bd571e1163ece28efc0..34023a3c0967211ab80618cbf0fd729c1e50f5f3 100644 (file)
@@ -16,6 +16,7 @@ typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO);
 
 struct AsyncIO {
        int sock;
+       int active_event;
        struct event recv_event, send_event;
        IOBuffer SendBuf, RecvBuf;
        IO_LineReaderCallback LineReader;
@@ -26,6 +27,11 @@ struct AsyncIO {
                eNextState NextState;
 };
 
+typedef struct _IOAddHandler {
+       void *Ctx;
+       EventContextAttach EvAttch;
+}IOAddHandler; 
+
 void FreeAsyncIOContents(AsyncIO *IO);
 
 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB);
diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c
new file mode 100644 (file)
index 0000000..68a45c7
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 1998-2009 by the citadel.org team
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include "sysdep.h"
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <termios.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <pwd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <syslog.h>
+
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+#  include <sys/time.h>
+# else
+#  include <time.h>
+# endif
+#endif
+#include <sys/wait.h>
+#include <ctype.h>
+#include <string.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <libcitadel.h>
+#include "citadel.h"
+#include "server.h"
+#include "citserver.h"
+#include "support.h"
+
+#include "ctdl_module.h"
+
+#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
+
+#include "event_client.h"
+
+int event_add_pipe[2] = {-1, -1};
+
+citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
+HashList *QueueEvents = NULL;
+
+HashList *InboundEventQueue = NULL;
+HashList *InboundEventQueues[2] = { NULL, NULL };
+
+static struct event_base *event_base;
+struct event queue_add_event;
+
+static void QueueEventAddCallback(int fd, short event, void *ctx)
+{
+       char buf[10];
+       HashList *q;
+       void *v;
+       HashPos  *It;
+       long len;
+       const char *Key;
+
+       /* get the control command... */
+       read(fd, buf, 1);
+       switch (buf[0]) {
+       case '+':
+               citthread_mutex_lock(&EventQueueMutex);
+
+               if (InboundEventQueues[0] == InboundEventQueue) {
+                       InboundEventQueue = InboundEventQueues[1];
+                       q = InboundEventQueues[0];
+               }
+               else {
+                       InboundEventQueue = InboundEventQueues[0];
+                       q = InboundEventQueues[1];
+               }
+               citthread_mutex_unlock(&EventQueueMutex);
+
+               It = GetNewHashPos(q, 0);
+               while (GetNextHashPos(q, It, &len, &Key, &v))
+               {
+                       IOAddHandler *h = v;
+                       h->EvAttch(h->Ctx);
+               }
+               DeleteHashPos(&It);
+               DeleteHashContent(&q);
+/// TODO: add it to QueueEvents
+               break;
+       case 'x':
+               event_del(&queue_add_event);
+               close(event_add_pipe[0]);
+/// TODO; flush QueueEvents fd's and delete it.
+               event_base_loopexit(event_base, NULL);
+       }
+       /* Unblock the other side */
+//     read(fd, buf, 1);
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
+}
+
+
+void InitEventQueue(void)
+{
+       struct rlimit LimitSet;
+
+       event_base = event_init();
+/*
+       base = event_base_new();
+       if (!base)
+               return NULL; / *XXXerr*/
+
+       citthread_mutex_init(&EventQueueMutex, NULL);
+
+       if (pipe(event_add_pipe) != 0) {
+               CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno));
+               abort();
+       }
+       LimitSet.rlim_cur = 1;
+       LimitSet.rlim_max = 1;
+       setrlimit(event_add_pipe[1], &LimitSet);
+
+       QueueEvents = NewHash(1, Flathash);
+       InboundEventQueues[0] = NewHash(1, Flathash);
+       InboundEventQueues[1] = NewHash(1, Flathash);
+       InboundEventQueue = InboundEventQueues[0];
+}
+/*
+ * this thread operates the select() etc. via libevent.
+ * 
+ * 
+ */
+void *client_event_thread(void *arg) 
+{
+       struct CitContext libevent_client_CC;
+       CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
+//     citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+       CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
+       
+       event_set(&queue_add_event, 
+                 event_add_pipe[0], 
+                 EV_READ|EV_PERSIST,
+                 QueueEventAddCallback, 
+                 NULL);
+       
+       event_add(&queue_add_event, NULL);
+
+
+       event_dispatch();
+       CtdlClearSystemContext();
+       event_base_free(event_base);
+       citthread_mutex_destroy(&EventQueueMutex);
+       return(NULL);
+}
+
+#endif
+
+CTDL_MODULE_INIT(event_client)
+{
+#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
+       if (!threading)
+       {
+               InitEventQueue();
+               CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
+/// todo register shutdown callback.
+       }
+#endif
+       return "event";
+}
diff --git a/citadel/modules/eventclient/serv_evventclient.c b/citadel/modules/eventclient/serv_evventclient.c
deleted file mode 100644 (file)
index bb13c9d..0000000
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (c) 1998-2009 by the citadel.org team
- *
- *  This program is free software; you can redistribute it and/or modify
- *  it under the terms of the GNU General Public License as published by
- *  the Free Software Foundation; either version 3 of the License, or
- *  (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful,
- *  but WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *  GNU General Public License for more details.
- *
- *  You should have received a copy of the GNU General Public License
- *  along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- */
-
-#include "sysdep.h"
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <termios.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <pwd.h>
-#include <errno.h>
-#include <sys/types.h>
-#include <syslog.h>
-
-#if TIME_WITH_SYS_TIME
-# include <sys/time.h>
-# include <time.h>
-#else
-# if HAVE_SYS_TIME_H
-#  include <sys/time.h>
-# else
-#  include <time.h>
-# endif
-#endif
-#include <sys/wait.h>
-#include <ctype.h>
-#include <string.h>
-#include <limits.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <libcitadel.h>
-#include "citadel.h"
-#include "server.h"
-#include "citserver.h"
-#include "support.h"
-
-#include "ctdl_module.h"
-
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
-
-#include "event_client.h"
-
-int event_add_pipe[2];
-
-citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
-void *QueueEventAddPtr = NULL;
-EventContextAttach EventContextAttachPtr;
-AsyncIO *QueueThisIO;
-HashList *QueueEvents = NULL;
-
-static struct event_base *event_base;
-struct event queue_add_event;
-
-static void QueueEventAddCallback(int fd, short event, void *ctx)
-{
-       char buf[10];
-
-       /* get the control command... */
-       read(fd, buf, 1);
-       switch (buf[0]) {
-       case '+':
-               EventContextAttachPtr(QueueEventAddPtr);
-/// TODO: add it to QueueEvents
-               break;
-       case 'x':
-               event_del(&queue_add_event);
-               close(event_add_pipe[0]);
-/// TODO; flush QueueEvents fd's and delete it.
-               event_base_loopexit(event_base, NULL);
-       }
-       /* Unblock the other side */
-       read(fd, buf, 1);
-}
-
-/*
- * this thread operates the select() etc. via libevent.
- * 
- * 
- */
-void *client_event_thread(void *arg) {
-       struct CitContext libevent_client_CC;
-       event_base = event_init();
-/*
-       base = event_base_new();
-       if (!base)
-               return NULL; /*XXXerr*/
-
-       citthread_mutex_init(&EventQueueMutex, NULL);
-       CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
-//     citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
-       CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
-       
-       if (pipe(event_add_pipe) != 0) {
-               CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno));
-               abort();
-       }
-
-       event_set(&queue_add_event, 
-                 event_add_pipe[0], 
-                 EV_READ|EV_PERSIST,
-                 QueueEventAddCallback, 
-                 NULL);
-       
-       event_add(&queue_add_event, NULL);
-
-
-       event_dispatch();
-       CtdlClearSystemContext();
-       event_base_free(event_base);
-       citthread_mutex_destroy(&EventQueueMutex);
-       return(NULL);
-}
-
-#endif
-
-CTDL_MODULE_INIT(event_client)
-{
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
-       if (!threading)
-       {
-               CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
-               QueueEvents = NewHash(1, Flathash);
-/// todo register shutdown callback.
-       }
-#endif
-       return "event";
-}
index 5082d94b4eb9a0be51da0264fb3ac08df83ad445..2904e297f4eec92de60d42890e2da4dd89575957 100644 (file)
@@ -577,7 +577,7 @@ void resolve_mx_hosts(SmtpOutMsg *SendMsg)
        }
 
 }
-/* TODO: abort... */
+
 #define SMTP_ERROR(WHICH_ERR, ERRSTR) {SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY(ERRSTR), 0); return eAbort; }
 #define SMTP_VERROR(WHICH_ERR) { SendMsg->MyQEntry->Status = WHICH_ERR; StrBufAppendBufPlain(SendMsg->MyQEntry->StatusMessage, &ChrPtr(SendMsg->IO.IOBuf)[4], -1, 0); return eAbort; }
 #define SMTP_IS_STATE(WHICH_STATE) (ChrPtr(SendMsg->IO.IOBuf)[0] == WHICH_STATE)
@@ -623,6 +623,10 @@ void connect_one_smtpsrv(SmtpOutMsg *SendMsg)
 int connect_one_smtpsrv_xamine_result(void *Ctx)
 {
        SmtpOutMsg *SendMsg = Ctx;
+
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connecting [%s:%s]!\n", 
+                     SendMsg->n, SendMsg->mx_host, SendMsg->mx_port);
+
        SendMsg->IO.SendBuf.fd = 
        SendMsg->IO.RecvBuf.fd = 
        SendMsg->IO.sock = sock_connect(SendMsg->mx_host, SendMsg->mx_port);
@@ -631,7 +635,7 @@ int connect_one_smtpsrv_xamine_result(void *Ctx)
                     "Could not connect: %s", strerror(errno));
        if (SendMsg->IO.sock >= 0) 
        {
-               CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connected!\n", SendMsg->n);
+               CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld:%ld]: connected!\n", SendMsg->n, SendMsg->IO.sock);
                int fdflags; 
                fdflags = fcntl(SendMsg->IO.sock, F_GETFL);
                if (fdflags < 0)
@@ -663,8 +667,8 @@ int connect_one_smtpsrv_xamine_result(void *Ctx)
        }
 
 
-       SendMsg->IO.SendBuf.Buf = NewStrBuf();
-       SendMsg->IO.RecvBuf.Buf = NewStrBuf();
+       SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024);
+       SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
        SendMsg->IO.IOBuf = NewStrBuf();
        InitEventIO(&SendMsg->IO, SendMsg, 
                    SMTP_C_DispatchReadDone, 
@@ -1047,12 +1051,12 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        long len;
        const char *Key;
 
-       CtdlLogPrintf(CTDL_DEBUG, "SMTP client: smtp_do_procmsg(%ld)\n", msgnum);
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: smtp_do_procmsg(%ld)\n", msgnum);
        ///strcpy(envelope_from, "");
 
        msg = CtdlFetchMessage(msgnum, 1);
        if (msg == NULL) {
-               CtdlLogPrintf(CTDL_ERR, "SMTP client: tried %ld but no such message!\n", msgnum);
+               CtdlLogPrintf(CTDL_ERR, "SMTP Queue: tried %ld but no such message!\n", msgnum);
                return;
        }
 
@@ -1072,7 +1076,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
        FreeStrBuf(&PlainQItem);
 
        if (MyQItem == NULL) {
-               CtdlLogPrintf(CTDL_ERR, "SMTP client: Msg No %ld: already in progress!\n", msgnum);             
+               CtdlLogPrintf(CTDL_ERR, "SMTP Queue: Msg No %ld: already in progress!\n", msgnum);              
                return; /* s.b. else is already processing... */
        }
 
@@ -1098,7 +1102,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
         * Bail out if there's no actual message associated with this
         */
        if (MyQItem->MessageID < 0L) {
-               CtdlLogPrintf(CTDL_ERR, "SMTP client: no 'msgid' directive found!\n");
+               CtdlLogPrintf(CTDL_ERR, "SMTP Queue: no 'msgid' directive found!\n");
                It = GetNewHashPos(MyQItem->MailQEntries, 0);
                citthread_mutex_lock(&ActiveQItemsLock);
                {
@@ -1111,6 +1115,14 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
                return;
        }
 
+       It = GetNewHashPos(MyQItem->MailQEntries, 0);
+       while (GetNextHashPos(MyQItem->MailQEntries, It, &len, &Key, &vQE))
+       {
+               MailQEntry *ThisItem = vQE;
+               CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Task: <%s> %d\n", ChrPtr(ThisItem->Recipient), ThisItem->Active);
+       }
+       DeleteHashPos(&It);
+
        CountActiveQueueEntries(MyQItem);
        if (MyQItem->ActiveDeliveries > 0)
        {
@@ -1122,7 +1134,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) {
                {
                        MailQEntry *ThisItem = vQE;
                        if (ThisItem->Active == 1) {
-                               CtdlLogPrintf(CTDL_DEBUG, "SMTP client: Trying <%s>\n", ChrPtr(ThisItem->Recipient));
+                               CtdlLogPrintf(CTDL_DEBUG, "SMTP Queue: Trying <%s>\n", ChrPtr(ThisItem->Recipient));
                                smtp_try(MyQItem, ThisItem, Msg, (i == MyQItem->ActiveDeliveries));
                                i++;
                        }
@@ -1197,6 +1209,8 @@ void *smtp_queue_thread(void *arg) {
        int num_processed = 0;
        struct CitContext smtp_queue_CC;
 
+       CtdlThreadSleep(10);
+
        CtdlFillSystemContext(&smtp_queue_CC, "SMTP Send");
        citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
        CtdlLogPrintf(CTDL_DEBUG, "smtp_queue_thread() initializing\n");