libev migration: use async methods to schedule jobs. thanks to Marc Lehmann for the...
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include "citadel.h"
50 #include "server.h"
51 #include "citserver.h"
52 #include "support.h"
53
54 #include "ctdl_module.h"
55
56 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
57
58 #include "event_client.h"
59
60 int event_add_pipe[2] = {-1, -1};
61
62 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
63 HashList *QueueEvents = NULL;
64
65 HashList *InboundEventQueue = NULL;
66 HashList *InboundEventQueues[2] = { NULL, NULL };
67
68 struct ev_loop *event_base;
69
70 ev_async AddJob;   
71 ev_async ExitEventLoop;
72
73 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
74 {
75         HashList *q;
76         void *v;
77         HashPos  *It;
78         long len;
79         const char *Key;
80
81         /* get the control command... */
82         citthread_mutex_lock(&EventQueueMutex);
83
84         if (InboundEventQueues[0] == InboundEventQueue) {
85                 InboundEventQueue = InboundEventQueues[1];
86                 q = InboundEventQueues[0];
87         }
88         else {
89                 InboundEventQueue = InboundEventQueues[0];
90                 q = InboundEventQueues[1];
91         }
92         citthread_mutex_unlock(&EventQueueMutex);
93
94         It = GetNewHashPos(q, 0);
95         while (GetNextHashPos(q, It, &len, &Key, &v))
96         {
97                 IOAddHandler *h = v;
98                 h->EvAttch(h->IO);
99         }
100         DeleteHashPos(&It);
101         DeleteHashContent(&q);
102         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
103 }
104
105
106 static void EventExitCallback(EV_P_ ev_async *w, int revents)
107 {
108         ev_unloop(event_base, EVUNLOOP_ALL);
109
110         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
111 }
112
113
114
115 void InitEventQueue(void)
116 {
117         struct rlimit LimitSet;
118
119         citthread_mutex_init(&EventQueueMutex, NULL);
120
121         if (pipe(event_add_pipe) != 0) {
122                 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
123                 abort();
124         }
125         LimitSet.rlim_cur = 1;
126         LimitSet.rlim_max = 1;
127         setrlimit(event_add_pipe[1], &LimitSet);
128
129         QueueEvents = NewHash(1, Flathash);
130         InboundEventQueues[0] = NewHash(1, Flathash);
131         InboundEventQueues[1] = NewHash(1, Flathash);
132         InboundEventQueue = InboundEventQueues[0];
133 }
134 /*
135  * this thread operates the select() etc. via libev.
136  * 
137  * 
138  */
139 void *client_event_thread(void *arg) 
140 {
141         struct CitContext libevent_client_CC;
142
143         CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread");
144 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
145         CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n");
146
147         event_base = ev_default_loop (EVFLAG_AUTO);
148
149         ev_async_init(&AddJob, QueueEventAddCallback);
150         ev_async_start(event_base, &AddJob);
151         ev_async_init(&ExitEventLoop, EventExitCallback);
152         ev_async_start(event_base, &ExitEventLoop);
153
154         ev_loop (event_base, 0);
155         CtdlClearSystemContext();
156         ev_default_destroy ();
157         
158         DeleteHash(&QueueEvents);
159         InboundEventQueue = NULL;
160         DeleteHash(&InboundEventQueues[0]);
161         DeleteHash(&InboundEventQueues[1]);
162         citthread_mutex_destroy(&EventQueueMutex);
163
164
165         return(NULL);
166 }
167
168 #endif
169
170 CTDL_MODULE_INIT(event_client)
171 {
172 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
173         if (!threading)
174         {
175                 InitEventQueue();
176                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
177 /// todo register shutdown callback.
178         }
179 #endif
180         return "event";
181 }