Libevent Migration
[citadel.git] / citadel / event_client.c
1 /*
2  *
3  * Copyright (c) 1998-2009 by the citadel.org team
4  *
5  *  This program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 #include "sysdep.h"
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <termios.h>
25 #include <fcntl.h>
26 #include <signal.h>
27 #include <pwd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <syslog.h>
31
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
34 # include <time.h>
35 #else
36 # if HAVE_SYS_TIME_H
37 #  include <sys/time.h>
38 # else
39 #  include <time.h>
40 # endif
41 #endif
42 #include <sys/wait.h>
43 #include <ctype.h>
44 #include <string.h>
45 #include <limits.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include "citadel.h"
51 #include "server.h"
52 #include "citserver.h"
53 #include "support.h"
54 #include "config.h"
55 #include "control.h"
56 #include "user_ops.h"
57 #include "database.h"
58 #include "msgbase.h"
59 #include "internet_addressing.h"
60 #include "genstamp.h"
61 #include "domain.h"
62 #include "clientsocket.h"
63 #include "locate_host.h"
64 #include "citadel_dirs.h"
65
66 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
67
68 #include <event.h>
69 #include "event_client.h"
70
71 extern int event_add_pipe[2];
72 extern citthread_mutex_t EventQueueMutex;
73 extern HashList *InboundEventQueue;
74
75 #define SEND_EVENT 1
76 #define RECV_EVENT 2
77         
78 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB)
79 {
80         IOAddHandler *h;
81         int i;
82
83         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
84         h->Ctx = Ctx;
85         h->EvAttch = CB;
86
87         citthread_mutex_lock(&EventQueueMutex);
88         if (event_add_pipe[1] == -1) {
89                 citthread_mutex_unlock(&EventQueueMutex);
90                 free (h);
91                 return -1;
92         }
93         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
94         i = GetCount(InboundEventQueue);
95         Put(InboundEventQueue, IKEY(i), h, NULL);
96         citthread_mutex_unlock(&EventQueueMutex);
97
98         write(event_add_pipe[1], "+_", 1);
99         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
100         return 0;
101 }
102
103
104 int ShutDownEventQueue(void)
105 {
106         citthread_mutex_lock(&EventQueueMutex);
107         if (event_add_pipe[1] == -1) {
108                 citthread_mutex_unlock(&EventQueueMutex);
109
110                 return -1;
111         }
112         write(event_add_pipe[1], "x_", 1);
113         close(event_add_pipe[1]);
114         event_add_pipe[1] = -1;
115         citthread_mutex_unlock(&EventQueueMutex);
116         return 0;
117 }
118
119 void FreeAsyncIOContents(AsyncIO *IO)
120 {
121         FreeStrBuf(&IO->IOBuf);
122         FreeStrBuf(&IO->SendBuf.Buf);
123         FreeStrBuf(&IO->RecvBuf.Buf);
124
125 }
126
127 /*
128   static void
129   setup_signal_handlers(struct instance *instance)
130   {
131   signal(SIGPIPE, SIG_IGN);
132
133   event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
134   exit_event_callback, instance);
135   event_add(&instance->sigterm_event, NULL);
136
137   event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
138   exit_event_callback, instance);
139   event_add(&instance->sigint_event, NULL);
140
141   event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
142   exit_event_callback, instance);
143   event_add(&instance->sigquit_event, NULL);
144   }
145 */
146
147 void ShutDownCLient(AsyncIO *IO)
148 {
149         CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
150         switch (IO->active_event) {
151         case SEND_EVENT:
152                 event_del(&IO->send_event);
153                 break;
154         case RECV_EVENT:
155                 event_del(&IO->recv_event);
156                 break;
157         case 0:
158                 // no event active here; just bail out.
159                 break;
160         }
161         IO->active_event = 0;
162         IO->Terminate(IO->Data);
163
164 //      citthread_mutex_lock(&EventQueueMutex);
165
166 ///QueueEvents /// todo remove from hash.
167
168 //      citthread_mutex_unlock(&EventQueueMutex);
169 }
170
171 eReadState HandleInbound(AsyncIO *IO)
172 {
173         eReadState Finished = eBufferNotEmpty;
174                 
175         while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){
176                 if (IO->RecvBuf.nBlobBytesWanted != 0) { 
177                                 
178                 }
179                 else { /* Reading lines... */
180 //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
181                         if (IO->LineReader)
182                                 Finished = IO->LineReader(IO);
183                         else 
184                                 Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
185                                 
186                         switch (Finished) {
187                         case eMustReadMore: /// read new from socket... 
188                                 return Finished;
189                                 break;
190                         case eBufferNotEmpty: /* shouldn't happen... */
191                         case eReadSuccess: /// done for now...
192                                 break;
193                         case eReadFail: /// WHUT?
194                                 ///todo: shut down! 
195                                 break;
196                         }
197                                         
198                 }
199                         
200                 if (Finished != eMustReadMore) {
201                         event_del(&IO->recv_event);
202                         IO->active_event = 0;
203                         IO->NextState = IO->ReadDone(IO->Data);
204                         Finished = StrBufCheckBuffer(&IO->RecvBuf);
205                 }
206         }
207
208
209         if ((IO->NextState == eSendReply) ||
210             (IO->NextState == eSendMore))
211         {
212                 IO->NextState = IO->SendDone(IO->Data);
213                 event_add(&IO->send_event, NULL);
214                 IO->active_event = SEND_EVENT;
215                         
216         }
217         else if ((IO->NextState == eTerminateConnection) ||
218                  (IO->NextState == eAbort) )
219                 ShutDownCLient(IO);
220         return Finished;
221 }
222
223
224 static void
225 IO_send_callback(int fd, short event, void *ctx)
226 {
227         int rc;
228         AsyncIO *IO = ctx;
229
230         (void)fd;
231         (void)event;
232         CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d  : [%s%s%s%s]\n",
233                       (int) fd,
234                       (event&EV_TIMEOUT) ? " timeout" : "",
235                       (event&EV_READ)    ? " read" : "",
236                       (event&EV_WRITE)   ? " write" : "",
237                       (event&EV_SIGNAL)  ? " signal" : "");
238
239 ///    assert(fd == IO->sock);
240         
241         rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf);
242
243         if (rc == 0)
244         {
245                 
246 #ifdef BIGBAD_IODBG
247                 {
248                         int rv = 0;
249                         char fn [SIZ];
250                         FILE *fd;
251                         const char *pch = ChrPtr(IO->SendBuf.Buf);
252                         const char *pchh = IO->SendBuf.ReadWritePointer;
253                         long nbytes;
254
255                         if (pchh == NULL)
256                                 pchh = pch;
257                         
258                         nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
259                         snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock);
260                 
261                         fd = fopen(fn, "a+");
262                         fprintf(fd, "Read: BufSize: %ld BufContent: [",
263                                 nbytes);
264                         rv = fwrite(pchh, nbytes, 1, fd);
265                         fprintf(fd, "]\n");
266                 
267                         
268                         fclose(fd);
269                 }
270 #endif
271                 event_del(&IO->send_event);
272                 IO->active_event = 0;
273                 switch (IO->NextState) {
274                 case eSendReply:
275                         break;
276                 case eSendMore:
277                         IO->NextState = IO->SendDone(IO->Data);
278
279                         if ((IO->NextState == eTerminateConnection) ||
280                             (IO->NextState == eAbort) )
281                                 ShutDownCLient(IO);
282                         else {
283                                 event_add(&IO->send_event, NULL);
284                                 IO->active_event = SEND_EVENT;
285                         }
286                         break;
287                 case eReadMessage:
288                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
289                                 HandleInbound(IO);
290                         }
291                         else {
292                                 event_add(&IO->recv_event, NULL);
293                                 IO->active_event = RECV_EVENT;
294                         }
295
296                         break;
297                 case eAbort:
298                         break;
299                 }
300         }
301         else if (rc > 0)
302                 ShutDownCLient(IO);
303 //      else 
304                 ///abort!
305 }
306
307
308 static void
309 IO_recv_callback(int fd, short event, void *ctx)
310 {
311         ssize_t nbytes;
312         AsyncIO *IO = ctx;
313
314 //    assert(fd == IO->sock);
315         
316 //    assert(fd == sb->fd);
317
318         CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d  : [%s%s%s%s]\n",
319             (int) fd,
320             (event&EV_TIMEOUT) ? " timeout" : "",
321             (event&EV_READ)    ? " read" : "",
322             (event&EV_WRITE)   ? " write" : "",
323             (event&EV_SIGNAL)  ? " signal" : "");
324         nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf);
325         if (nbytes > 0) {
326                 HandleInbound(IO);
327         } else if (nbytes == 0) {
328                 ShutDownCLient(IO);
329 ///  TODO: this is a timeout???  sock_buff_invoke_free(sb, 0); seems as if socket is gone then?
330                 return;
331         } else if (nbytes == -1) {
332 /// TODO: FD is gone. kick it.        sock_buff_invoke_free(sb, errno);
333                 return;
334         }
335 }
336
337 void InitEventIO(AsyncIO *IO, 
338                  void *pData, 
339                  IO_CallBack ReadDone, 
340                  IO_CallBack SendDone, 
341                  IO_CallBack Terminate, 
342                  IO_LineReaderCallback LineReader,
343                  int ReadFirst)
344 {
345         IO->Data = pData;
346         IO->SendDone = SendDone;
347         IO->ReadDone = ReadDone;
348         IO->Terminate = Terminate;
349         IO->LineReader = LineReader;
350
351         event_set(&IO->recv_event, 
352                   IO->sock, 
353                   EV_READ|EV_PERSIST,
354                   IO_recv_callback, 
355                   IO);
356
357         event_set(&IO->send_event, 
358                   IO->sock,
359                   EV_WRITE|EV_PERSIST,
360                   IO_send_callback, 
361                   IO);
362
363         if (ReadFirst) {
364                 IO->NextState = eReadMessage;
365                 event_add(&IO->recv_event, NULL);
366                 IO->active_event = RECV_EVENT;
367         }
368         else {
369                 IO->NextState = eSendReply;
370         }
371 }
372
373
374 #endif