2 * Copyright (c) 1998-2009 by the citadel.org team
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 #include <sys/types.h>
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
36 # include <sys/time.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
51 #include "citserver.h"
54 #include "ctdl_module.h"
56 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
58 #include "event_client.h"
60 int event_add_pipe[2] = {-1, -1};
62 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
63 HashList *QueueEvents = NULL;
65 HashList *InboundEventQueue = NULL;
66 HashList *InboundEventQueues[2] = { NULL, NULL };
68 struct ev_loop *event_base;
71 ev_async ExitEventLoop;
73 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
81 /* get the control command... */
82 citthread_mutex_lock(&EventQueueMutex);
84 if (InboundEventQueues[0] == InboundEventQueue) {
85 InboundEventQueue = InboundEventQueues[1];
86 q = InboundEventQueues[0];
89 InboundEventQueue = InboundEventQueues[0];
90 q = InboundEventQueues[1];
92 citthread_mutex_unlock(&EventQueueMutex);
94 It = GetNewHashPos(q, 0);
95 while (GetNextHashPos(q, It, &len, &Key, &v))
101 DeleteHashContent(&q);
102 CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
106 static void EventExitCallback(EV_P_ ev_async *w, int revents)
108 ev_unloop(event_base, EVUNLOOP_ALL);
110 CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
115 void InitEventQueue(void)
117 struct rlimit LimitSet;
119 citthread_mutex_init(&EventQueueMutex, NULL);
121 if (pipe(event_add_pipe) != 0) {
122 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
125 LimitSet.rlim_cur = 1;
126 LimitSet.rlim_max = 1;
127 setrlimit(event_add_pipe[1], &LimitSet);
129 QueueEvents = NewHash(1, Flathash);
130 InboundEventQueues[0] = NewHash(1, Flathash);
131 InboundEventQueues[1] = NewHash(1, Flathash);
132 InboundEventQueue = InboundEventQueues[0];
135 * this thread operates the select() etc. via libev.
139 void *client_event_thread(void *arg)
141 struct CitContext libevent_client_CC;
143 CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread");
144 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
145 CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n");
147 event_base = ev_default_loop (EVFLAG_AUTO);
149 ev_async_init(&AddJob, QueueEventAddCallback);
150 ev_async_start(event_base, &AddJob);
151 ev_async_init(&ExitEventLoop, EventExitCallback);
152 ev_async_start(event_base, &ExitEventLoop);
154 ev_loop (event_base, 0);
155 CtdlClearSystemContext();
156 ev_default_destroy ();
158 DeleteHash(&QueueEvents);
159 InboundEventQueue = NULL;
160 DeleteHash(&InboundEventQueues[0]);
161 DeleteHash(&InboundEventQueues[1]);
162 citthread_mutex_destroy(&EventQueueMutex);
170 CTDL_MODULE_INIT(event_client)
172 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
176 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
177 /// todo register shutdown callback.