libev migration - shutdown cleanly.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include "citadel.h"
50 #include "server.h"
51 #include "citserver.h"
52 #include "support.h"
53
54 #include "ctdl_module.h"
55
56 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
57
58 #include "event_client.h"
59
60 int event_add_pipe[2] = {-1, -1};
61
62 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
63 HashList *QueueEvents = NULL;
64
65 HashList *InboundEventQueue = NULL;
66 HashList *InboundEventQueues[2] = { NULL, NULL };
67
68 struct ev_loop *event_base;
69 struct ev_io queue_add_event;
70
71 static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents)
72 {
73         char buf[10];
74         HashList *q;
75         void *v;
76         HashPos  *It;
77         long len;
78         const char *Key;
79
80         /* get the control command... */
81         read(watcher->fd, buf, 1);
82         switch (buf[0]) {
83         case '+':
84                 citthread_mutex_lock(&EventQueueMutex);
85
86                 if (InboundEventQueues[0] == InboundEventQueue) {
87                         InboundEventQueue = InboundEventQueues[1];
88                         q = InboundEventQueues[0];
89                 }
90                 else {
91                         InboundEventQueue = InboundEventQueues[0];
92                         q = InboundEventQueues[1];
93                 }
94                 citthread_mutex_unlock(&EventQueueMutex);
95
96                 It = GetNewHashPos(q, 0);
97                 while (GetNextHashPos(q, It, &len, &Key, &v))
98                 {
99                         IOAddHandler *h = v;
100                         h->EvAttch(h->Ctx);
101                 }
102                 DeleteHashPos(&It);
103                 DeleteHashContent(&q);
104 /// TODO: add it to QueueEvents
105                 break;
106         case 'x':
107                 /////event_del(&queue_add_event);
108                 close(event_add_pipe[0]);
109 /// TODO; flush QueueEvents fd's and delete it.
110                 ev_io_stop(event_base, &queue_add_event);
111                 ev_unloop(event_base, EVUNLOOP_ALL);
112         }
113         /* Unblock the other side */
114 //      read(fd, buf, 1);
115         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
116 }
117
118
119 void InitEventQueue(void)
120 {
121         struct rlimit LimitSet;
122
123 ///     event_base = ev_default_loop(0);
124 /*
125         base = event_base_new();
126         if (!base)
127                 return NULL; / *XXXerr*/
128
129         citthread_mutex_init(&EventQueueMutex, NULL);
130
131         if (pipe(event_add_pipe) != 0) {
132                 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
133                 abort();
134         }
135         LimitSet.rlim_cur = 1;
136         LimitSet.rlim_max = 1;
137         setrlimit(event_add_pipe[1], &LimitSet);
138
139         QueueEvents = NewHash(1, Flathash);
140         InboundEventQueues[0] = NewHash(1, Flathash);
141         InboundEventQueues[1] = NewHash(1, Flathash);
142         InboundEventQueue = InboundEventQueues[0];
143 }
144 /*
145  * this thread operates the select() etc. via libev.
146  * 
147  * 
148  */
149 void *client_event_thread(void *arg) 
150 {
151         struct CitContext libevent_client_CC;
152
153         CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
154 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
155         CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
156 /*      
157         event_set(&queue_add_event, 
158                   event_add_pipe[0], 
159                   EV_READ|EV_PERSIST,
160                   QueueEventAddCallback, 
161                   NULL);
162         
163         event_add(&queue_add_event, NULL);
164 */
165 /*
166         ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
167         ev_io_start(event_base, &queue_add_event);
168
169 */
170         event_base = ev_default_loop (EVFLAG_AUTO);
171 ///     ev_loop(event_base, 0);
172
173         ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
174         ev_io_start(event_base, &queue_add_event);
175
176         ev_loop (event_base, 0);
177         CtdlClearSystemContext();
178         ev_default_destroy ();
179         citthread_mutex_destroy(&EventQueueMutex);
180         return(NULL);
181 }
182
183 #endif
184
185 CTDL_MODULE_INIT(event_client)
186 {
187 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
188         if (!threading)
189         {
190                 InitEventQueue();
191                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
192 /// todo register shutdown callback.
193         }
194 #endif
195         return "event";
196 }