Libevent Migration
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include "citadel.h"
50 #include "server.h"
51 #include "citserver.h"
52 #include "support.h"
53
54 #include "ctdl_module.h"
55
56 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
57
58 #include "event_client.h"
59
60 int event_add_pipe[2] = {-1, -1};
61
62 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
63 HashList *QueueEvents = NULL;
64
65 HashList *InboundEventQueue = NULL;
66 HashList *InboundEventQueues[2] = { NULL, NULL };
67
68 static struct event_base *event_base;
69 struct event queue_add_event;
70
71 static void QueueEventAddCallback(int fd, short event, void *ctx)
72 {
73         char buf[10];
74         HashList *q;
75         void *v;
76         HashPos  *It;
77         long len;
78         const char *Key;
79
80         /* get the control command... */
81         read(fd, buf, 1);
82         switch (buf[0]) {
83         case '+':
84                 citthread_mutex_lock(&EventQueueMutex);
85
86                 if (InboundEventQueues[0] == InboundEventQueue) {
87                         InboundEventQueue = InboundEventQueues[1];
88                         q = InboundEventQueues[0];
89                 }
90                 else {
91                         InboundEventQueue = InboundEventQueues[0];
92                         q = InboundEventQueues[1];
93                 }
94                 citthread_mutex_unlock(&EventQueueMutex);
95
96                 It = GetNewHashPos(q, 0);
97                 while (GetNextHashPos(q, It, &len, &Key, &v))
98                 {
99                         IOAddHandler *h = v;
100                         h->EvAttch(h->Ctx);
101                 }
102                 DeleteHashPos(&It);
103                 DeleteHashContent(&q);
104 /// TODO: add it to QueueEvents
105                 break;
106         case 'x':
107                 event_del(&queue_add_event);
108                 close(event_add_pipe[0]);
109 /// TODO; flush QueueEvents fd's and delete it.
110                 event_base_loopexit(event_base, NULL);
111         }
112         /* Unblock the other side */
113 //      read(fd, buf, 1);
114         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
115 }
116
117
118 void InitEventQueue(void)
119 {
120         struct rlimit LimitSet;
121
122         event_base = event_init();
123 /*
124         base = event_base_new();
125         if (!base)
126                 return NULL; / *XXXerr*/
127
128         citthread_mutex_init(&EventQueueMutex, NULL);
129
130         if (pipe(event_add_pipe) != 0) {
131                 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno));
132                 abort();
133         }
134         LimitSet.rlim_cur = 1;
135         LimitSet.rlim_max = 1;
136         setrlimit(event_add_pipe[1], &LimitSet);
137
138         QueueEvents = NewHash(1, Flathash);
139         InboundEventQueues[0] = NewHash(1, Flathash);
140         InboundEventQueues[1] = NewHash(1, Flathash);
141         InboundEventQueue = InboundEventQueues[0];
142 }
143 /*
144  * this thread operates the select() etc. via libevent.
145  * 
146  * 
147  */
148 void *client_event_thread(void *arg) 
149 {
150         struct CitContext libevent_client_CC;
151         CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread");
152 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
153         CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n");
154         
155         event_set(&queue_add_event, 
156                   event_add_pipe[0], 
157                   EV_READ|EV_PERSIST,
158                   QueueEventAddCallback, 
159                   NULL);
160         
161         event_add(&queue_add_event, NULL);
162
163
164         event_dispatch();
165         CtdlClearSystemContext();
166         event_base_free(event_base);
167         citthread_mutex_destroy(&EventQueueMutex);
168         return(NULL);
169 }
170
171 #endif
172
173 CTDL_MODULE_INIT(event_client)
174 {
175 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
176         if (!threading)
177         {
178                 InitEventQueue();
179                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
180 /// todo register shutdown callback.
181         }
182 #endif
183         return "event";
184 }