2 * Copyright (c) 1998-2009 by the citadel.org team
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 #include <sys/types.h>
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
36 # include <sys/time.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
51 #include "citserver.h"
54 #include "ctdl_module.h"
56 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
58 #include "event_client.h"
60 int event_add_pipe[2] = {-1, -1};
62 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
63 HashList *QueueEvents = NULL;
65 HashList *InboundEventQueue = NULL;
66 HashList *InboundEventQueues[2] = { NULL, NULL };
68 struct ev_loop *event_base;
69 struct ev_io queue_add_event;
71 static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents)
80 /* get the control command... */
81 read(watcher->fd, buf, 1);
84 citthread_mutex_lock(&EventQueueMutex);
86 if (InboundEventQueues[0] == InboundEventQueue) {
87 InboundEventQueue = InboundEventQueues[1];
88 q = InboundEventQueues[0];
91 InboundEventQueue = InboundEventQueues[0];
92 q = InboundEventQueues[1];
94 citthread_mutex_unlock(&EventQueueMutex);
96 It = GetNewHashPos(q, 0);
97 while (GetNextHashPos(q, It, &len, &Key, &v))
103 DeleteHashContent(&q);
104 /// TODO: add it to QueueEvents
107 close(event_add_pipe[0]);
108 /// TODO; flush QueueEvents fd's and delete it.
109 ev_io_stop(event_base, &queue_add_event);
110 ev_unloop(event_base, EVUNLOOP_ALL);
112 CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
116 void InitEventQueue(void)
118 struct rlimit LimitSet;
120 citthread_mutex_init(&EventQueueMutex, NULL);
122 if (pipe(event_add_pipe) != 0) {
123 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
126 LimitSet.rlim_cur = 1;
127 LimitSet.rlim_max = 1;
128 setrlimit(event_add_pipe[1], &LimitSet);
130 QueueEvents = NewHash(1, Flathash);
131 InboundEventQueues[0] = NewHash(1, Flathash);
132 InboundEventQueues[1] = NewHash(1, Flathash);
133 InboundEventQueue = InboundEventQueues[0];
136 * this thread operates the select() etc. via libev.
140 void *client_event_thread(void *arg)
142 struct CitContext libevent_client_CC;
144 CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread");
145 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
146 CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n");
148 event_base = ev_default_loop (EVFLAG_AUTO);
150 ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
151 ev_io_start(event_base, &queue_add_event);
153 ev_loop (event_base, 0);
154 CtdlClearSystemContext();
155 ev_default_destroy ();
157 DeleteHash(&QueueEvents);
158 InboundEventQueue = NULL;
159 DeleteHash(&InboundEventQueues[0]);
160 DeleteHash(&InboundEventQueues[1]);
161 citthread_mutex_destroy(&EventQueueMutex);
169 CTDL_MODULE_INIT(event_client)
171 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
175 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
176 /// todo register shutdown callback.