Merge branch 'libevent' of ssh://git.citadel.org/appl/gitroot/citadel into libevent
[citadel.git] / citadel / event_client.c
1 /*
2  *
3  * Copyright (c) 1998-2009 by the citadel.org team
4  *
5  *  This program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 #include "sysdep.h"
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <termios.h>
25 #include <fcntl.h>
26 #include <signal.h>
27 #include <pwd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <syslog.h>
31
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
34 # include <time.h>
35 #else
36 # if HAVE_SYS_TIME_H
37 #  include <sys/time.h>
38 # else
39 #  include <time.h>
40 # endif
41 #endif
42 #include <sys/wait.h>
43 #include <ctype.h>
44 #include <string.h>
45 #include <limits.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <assert.h>
50
51 #include <libcitadel.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56 #include "config.h"
57 #include "control.h"
58 #include "user_ops.h"
59 #include "database.h"
60 #include "msgbase.h"
61 #include "internet_addressing.h"
62 #include "genstamp.h"
63 #include "domain.h"
64 #include "clientsocket.h"
65 #include "locate_host.h"
66 #include "citadel_dirs.h"
67
68 #include "event_client.h"
69
70 extern int event_add_pipe[2];
71 extern citthread_mutex_t EventQueueMutex;
72 extern HashList *InboundEventQueue;
73 extern struct ev_loop *event_base;
74
75         
76 int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
77 {
78         IOAddHandler *h;
79         int i;
80
81         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
82         h->IO = IO;
83         h->EvAttch = CB;
84
85         citthread_mutex_lock(&EventQueueMutex);
86         if (event_add_pipe[1] == -1) {
87                 citthread_mutex_unlock(&EventQueueMutex);
88                 free (h);
89                 return -1;
90         }
91         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
92         i = GetCount(InboundEventQueue);
93         Put(InboundEventQueue, IKEY(i), h, NULL);
94         citthread_mutex_unlock(&EventQueueMutex);
95
96         write(event_add_pipe[1], "+_", 1);
97         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
98         return 0;
99 }
100
101
102 int ShutDownEventQueue(void)
103 {
104         citthread_mutex_lock(&EventQueueMutex);
105         if (event_add_pipe[1] == -1) {
106                 citthread_mutex_unlock(&EventQueueMutex);
107
108                 return -1;
109         }
110         write(event_add_pipe[1], "x_", 1);
111         close(event_add_pipe[1]);
112         event_add_pipe[1] = -1;
113         citthread_mutex_unlock(&EventQueueMutex);
114         return 0;
115 }
116
117 void FreeAsyncIOContents(AsyncIO *IO)
118 {
119         FreeStrBuf(&IO->IOBuf);
120         FreeStrBuf(&IO->SendBuf.Buf);
121         FreeStrBuf(&IO->RecvBuf.Buf);
122 }
123
124
125 void ShutDownCLient(AsyncIO *IO)
126 {
127         CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
128
129         if (IO->sock != 0)
130         {
131                 ev_io_stop(event_base, &IO->send_event);
132                 ev_io_stop(event_base, &IO->recv_event);
133                 ev_timer_stop (event_base, &IO->rw_timeout);
134                 close(IO->sock);
135                 IO->sock = 0;
136                 IO->SendBuf.fd = 0;
137                 IO->RecvBuf.fd = 0;
138         }
139         if (IO->DNSChannel != NULL) {
140                 ares_destroy(IO->DNSChannel);
141                 ev_io_stop(event_base, &IO->dns_recv_event);
142                 ev_io_stop(event_base, &IO->dns_send_event);
143                 IO->DNSChannel = NULL;
144         }
145         assert(IO->Terminate);
146         IO->Terminate(IO);
147         
148 }
149
150
151 eReadState HandleInbound(AsyncIO *IO)
152 {
153         eReadState Finished = eBufferNotEmpty;
154         
155         while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){
156                 if (IO->RecvBuf.nBlobBytesWanted != 0) { 
157                                 
158                 }
159                 else { /* Reading lines... */
160 //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
161                         if (IO->LineReader)
162                                 Finished = IO->LineReader(IO);
163                         else 
164                                 Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
165                                 
166                         switch (Finished) {
167                         case eMustReadMore: /// read new from socket... 
168                                 return Finished;
169                                 break;
170                         case eBufferNotEmpty: /* shouldn't happen... */
171                         case eReadSuccess: /// done for now...
172                                 break;
173                         case eReadFail: /// WHUT?
174                                 ///todo: shut down! 
175                                 break;
176                         }
177                                         
178                 }
179                         
180                 if (Finished != eMustReadMore) {
181                         assert(IO->ReadDone);
182                         ev_io_stop(event_base, &IO->recv_event);
183                         IO->NextState = IO->ReadDone(IO);
184                         Finished = StrBufCheckBuffer(&IO->RecvBuf);
185                 }
186         }
187
188
189         if ((IO->NextState == eSendReply) ||
190             (IO->NextState == eSendMore))
191         {
192                 assert(IO->SendDone);
193                 IO->NextState = IO->SendDone(IO);
194                 ev_io_start(event_base, &IO->send_event);
195         }
196         else if ((IO->NextState == eTerminateConnection) ||
197                  (IO->NextState == eAbort) )
198                 ShutDownCLient(IO);
199         return Finished;
200 }
201
202
203 static void
204 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
205 {
206         int rc;
207         AsyncIO *IO = watcher->data;
208
209         rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
210
211         if (rc == 0)
212         {               
213 #ifdef BIGBAD_IODBG
214                 {
215                         int rv = 0;
216                         char fn [SIZ];
217                         FILE *fd;
218                         const char *pch = ChrPtr(IO->SendBuf.Buf);
219                         const char *pchh = IO->SendBuf.ReadWritePointer;
220                         long nbytes;
221
222                         if (pchh == NULL)
223                                 pchh = pch;
224                         
225                         nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
226                         snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock);
227                 
228                         fd = fopen(fn, "a+");
229                         fprintf(fd, "Read: BufSize: %ld BufContent: [",
230                                 nbytes);
231                         rv = fwrite(pchh, nbytes, 1, fd);
232                         fprintf(fd, "]\n");
233                 
234                         
235                         fclose(fd);
236                 }
237 #endif
238                 ev_io_stop(event_base, &IO->send_event);
239                 switch (IO->NextState) {
240                 case eSendReply:
241                         break;
242                 case eSendMore:
243                         assert(IO->SendDone);
244                         IO->NextState = IO->SendDone(IO);
245
246                         if ((IO->NextState == eTerminateConnection) ||
247                             (IO->NextState == eAbort) )
248                                 ShutDownCLient(IO);
249                         else {
250                                 ev_io_start(event_base, &IO->send_event);
251                         }
252                         break;
253                 case eReadMessage:
254                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
255                                 HandleInbound(IO);
256                         }
257                         else {
258                                 ev_io_start(event_base, &IO->recv_event);
259                         }
260
261                         break;
262                 case eTerminateConnection:
263                 case eAbort:
264                         break;
265                 }
266         }
267         else if (rc < 0) {
268                 assert(IO->Timeout);
269                 IO->Timeout(IO);
270         }
271         /* else : must write more. */
272 }
273 static void
274 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
275 {
276         
277         switch(IO->NextState) {
278         case eReadMessage:
279                 ev_io_start(event_base, &IO->recv_event);
280                 break;
281         case eSendReply:
282         case eSendMore:
283                 IO_send_callback(loop, &IO->send_event, revents);
284                 break;
285         case eTerminateConnection:
286         case eAbort:
287                 /// TODO: WHUT?
288                 break;
289         }
290 }
291
292 static void
293 IO_Timout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
294 {
295         AsyncIO *IO = watcher->data;
296
297         ev_timer_stop (event_base, &IO->rw_timeout);
298         assert(IO->Timeout);
299         IO->Timeout(IO);
300 }
301 static void
302 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
303 {
304         AsyncIO *IO = watcher->data;
305
306         ev_timer_stop (event_base, &IO->conn_fail);
307         ev_io_stop(loop, &IO->conn_event);
308         assert(IO->ConnFail);
309         IO->ConnFail(IO);
310 }
311 static void
312 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
313 {
314         AsyncIO *IO = watcher->data;
315
316         ev_io_stop(loop, &IO->conn_event);
317         ev_timer_stop (event_base, &IO->conn_fail);
318         set_start_callback(loop, IO, revents);
319 }
320 static void
321 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
322 {
323         ssize_t nbytes;
324         AsyncIO *IO = watcher->data;
325
326         nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
327         if (nbytes > 0) {
328                 HandleInbound(IO);
329         } else if (nbytes == 0) {
330                 assert(IO->Timeout);
331                 IO->Timeout(IO); /* this is a timeout... */
332                 return;
333         } else if (nbytes == -1) {
334 /// TODO: FD is gone. kick it.        sock_buff_invoke_free(sb, errno);
335                 return;
336         }
337 }
338
339 void
340 IO_postdns_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
341 {
342         AsyncIO *IO = watcher->data;
343         IO->PostDNS(IO);
344 }
345
346 eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
347 {
348         int fdflags; 
349         int rc = -1;
350
351         IO->SendBuf.fd = IO->RecvBuf.fd = 
352                 IO->sock = socket(
353                         (IO->IP6)?PF_INET6:PF_INET, 
354                         SOCK_STREAM, 
355                         IPPROTO_TCP);
356
357         if (IO->sock < 0) {
358                 CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno));
359                 StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno));
360 //              freeaddrinfo(res);
361                 return eAbort;
362         }
363         fdflags = fcntl(IO->sock, F_GETFL);
364         if (fdflags < 0) {
365                 CtdlLogPrintf(CTDL_DEBUG,
366                               "EVENT: unable to get socket flags! %s \n",
367                               strerror(errno));
368                 StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno));
369                 return eAbort;
370         }
371         fdflags = fdflags | O_NONBLOCK;
372         if (fcntl(IO->sock, F_SETFL, fdflags) < 0) {
373                 CtdlLogPrintf(CTDL_DEBUG,
374                               "EVENT: unable to set socket nonblocking flags! %s \n",
375                               strerror(errno));
376                 StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno));
377                 close(IO->sock);
378                 return eAbort;
379         }
380 /* TODO: maye we could use offsetof() to calc the position of data... 
381  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
382  */
383         ev_io_init(&IO->recv_event, IO_recv_callback, IO->sock, EV_READ);
384         IO->recv_event.data = IO;
385         ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE);
386         IO->send_event.data = IO;
387
388         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
389         IO->conn_fail.data = IO;
390         ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0);
391         IO->rw_timeout.data = IO;
392
393         if (IO->IP6)
394                 rc = connect(IO->sock, &IO->Addr, sizeof(struct sockaddr_in6));
395         else
396                 rc = connect(IO->sock, (struct sockaddr_in *)&IO->Addr, sizeof(struct sockaddr_in));
397
398         if (rc >= 0){
399 ////            freeaddrinfo(res);
400                 set_start_callback(event_base, IO, 0);
401                 ev_timer_start(event_base, &IO->rw_timeout);
402                 return IO->NextState;
403         }
404         else if (errno == EINPROGRESS) {
405
406                 ev_io_init(&IO->conn_event, IO_connestd_callback, IO->sock, EV_READ|EV_WRITE);
407                 IO->conn_event.data = IO;
408
409                 ev_io_start(event_base, &IO->conn_event);
410                 ev_timer_start(event_base, &IO->conn_fail);
411                 return IO->NextState;
412         }
413         else {
414                 CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno));
415                 StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno));
416                 assert(IO->ConnFail);
417                 IO->ConnFail(IO);
418                 return eAbort;
419         }
420         return IO->NextState;
421 }
422
423 void SetNextTimeout(AsyncIO *IO, double timeout)
424 {
425         IO->rw_timeout.repeat = timeout;
426         ev_timer_again (event_base,  &IO->rw_timeout);
427 }
428
429 eNextState InitEventIO(AsyncIO *IO, 
430                        void *pData, 
431                        double conn_timeout, 
432                        double first_rw_timeout,
433                        int ReadFirst)
434 {
435         IO->Data = pData;
436         
437         if (ReadFirst) {
438                 IO->NextState = eReadMessage;
439         }
440         else {
441                 IO->NextState = eSendReply;
442         }
443         return event_connect_socket(IO, conn_timeout, first_rw_timeout);
444 }