EVENT: if we connect a remote port and have ipv4 bind socket, set this as our outboun...
[citadel.git] / citadel / event_client.c
1 /*
2  *
3  * Copyright (c) 1998-2012 by the citadel.org team
4  *
5  *  This program is open source software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 #include "sysdep.h"
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <termios.h>
25 #include <fcntl.h>
26 #include <signal.h>
27 #include <pwd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <syslog.h>
31
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
34 # include <time.h>
35 #else
36 # if HAVE_SYS_TIME_H
37 #  include <sys/time.h>
38 # else
39 #  include <time.h>
40 # endif
41 #endif
42 #include <sys/wait.h>
43 #include <ctype.h>
44 #include <string.h>
45 #include <limits.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <assert.h>
50 #if HAVE_BACKTRACE
51 #include <execinfo.h>
52 #endif
53
54 #include <libcitadel.h>
55 #include "citadel.h"
56 #include "server.h"
57 #include "citserver.h"
58 #include "support.h"
59 #include "config.h"
60 #include "control.h"
61 #include "user_ops.h"
62 #include "database.h"
63 #include "msgbase.h"
64 #include "internet_addressing.h"
65 #include "genstamp.h"
66 #include "domain.h"
67 #include "clientsocket.h"
68 #include "locate_host.h"
69 #include "citadel_dirs.h"
70
71 #include "event_client.h"
72 #include "ctdl_module.h"
73
74 static void IO_abort_shutdown_callback(struct ev_loop *loop,
75                                        ev_cleanup *watcher,
76                                        int revents)
77 {
78         AsyncIO *IO = watcher->data;
79         EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
80
81         assert(IO->ShutdownAbort);
82         IO->ShutdownAbort(IO);
83 }
84
85
86 /*------------------------------------------------------------------------------
87  *                              Server DB IO
88  *----------------------------------------------------------------------------*/
89 extern int evdb_count;
90 extern pthread_mutex_t DBEventQueueMutex;
91 extern HashList *DBInboundEventQueue;
92 extern struct ev_loop *event_db;
93 extern ev_async DBAddJob;
94 extern ev_async DBExitEventLoop;
95
96 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
97 {
98         IOAddHandler *h;
99         int i;
100
101         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
102         h->IO = IO;
103         h->EvAttch = CB;
104         ev_cleanup_init(&IO->db_abort_by_shutdown,
105                         IO_abort_shutdown_callback);
106         IO->db_abort_by_shutdown.data = IO;
107         ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
108
109         pthread_mutex_lock(&DBEventQueueMutex);
110         EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
111         i = ++evdb_count ;
112         Put(DBInboundEventQueue, IKEY(i), h, NULL);
113         pthread_mutex_unlock(&DBEventQueueMutex);
114
115         ev_async_send (event_db, &DBAddJob);
116         EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
117         return eDBQuery;
118 }
119
120 void ShutDownDBCLient(AsyncIO *IO)
121 {
122         CitContext *Ctx =IO->CitContext;
123         become_session(Ctx);
124
125         EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
126         ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
127
128         assert(IO->Terminate);
129         IO->Terminate(IO);
130 }
131
132 void
133 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
134 {
135         AsyncIO *IO = watcher->data;
136         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
137         become_session(IO->CitContext);
138
139         ev_idle_stop(event_db, &IO->db_unwind_stack);
140
141         assert(IO->NextDBOperation);
142         switch (IO->NextDBOperation(IO))
143         {
144         case eDBQuery:
145                 break;
146         case eSendDNSQuery:
147         case eReadDNSReply:
148         case eConnect:
149         case eSendReply:
150         case eSendMore:
151         case eSendFile:
152         case eReadMessage:
153         case eReadMore:
154         case eReadPayload:
155         case eReadFile:
156                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
157                 break;
158         case eTerminateConnection:
159         case eAbort:
160                 ev_idle_stop(event_db, &IO->db_unwind_stack);
161                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
162                 ShutDownDBCLient(IO);
163         }
164 }
165
166 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
167 {
168         IO->NextDBOperation = CB;
169         ev_idle_init(&IO->db_unwind_stack,
170                      DB_PerformNext);
171         IO->db_unwind_stack.data = IO;
172         ev_idle_start(event_db, &IO->db_unwind_stack);
173         return eDBQuery;
174 }
175
176 /*------------------------------------------------------------------------------
177  *                      Client IO
178  *----------------------------------------------------------------------------*/
179 extern int evbase_count;
180 extern pthread_mutex_t EventQueueMutex;
181 extern HashList *InboundEventQueue;
182 extern struct ev_loop *event_base;
183 extern ev_async AddJob;
184 extern ev_async ExitEventLoop;
185
186
187 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
188 {
189         IOAddHandler *h;
190         int i;
191
192         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
193         h->IO = IO;
194         h->EvAttch = CB;
195         ev_cleanup_init(&IO->abort_by_shutdown,
196                         IO_abort_shutdown_callback);
197         IO->abort_by_shutdown.data = IO;
198         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
199
200         pthread_mutex_lock(&EventQueueMutex);
201         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
202         i = ++evbase_count;
203         Put(InboundEventQueue, IKEY(i), h, NULL);
204         pthread_mutex_unlock(&EventQueueMutex);
205
206         ev_async_send (event_base, &AddJob);
207         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
208         return eSendReply;
209 }
210
211 extern eNextState evcurl_handle_start(AsyncIO *IO);
212
213 eNextState QueueCurlContext(AsyncIO *IO)
214 {
215         IOAddHandler *h;
216         int i;
217
218         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
219         h->IO = IO;
220         h->EvAttch = evcurl_handle_start;
221
222         pthread_mutex_lock(&EventQueueMutex);
223         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
224         i = ++evbase_count;
225         Put(InboundEventQueue, IKEY(i), h, NULL);
226         pthread_mutex_unlock(&EventQueueMutex);
227
228         ev_async_send (event_base, &AddJob);
229         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
230         return eSendReply;
231 }
232
233 void DestructCAres(AsyncIO *IO);
234 void FreeAsyncIOContents(AsyncIO *IO)
235 {
236         CitContext *Ctx = IO->CitContext;
237
238         FreeStrBuf(&IO->IOBuf);
239         FreeStrBuf(&IO->SendBuf.Buf);
240         FreeStrBuf(&IO->RecvBuf.Buf);
241
242         DestructCAres(IO);
243
244         FreeURL(&IO->ConnectMe);
245         FreeStrBuf(&IO->HttpReq.ReplyData);
246
247         if (Ctx) {
248                 Ctx->state = CON_IDLE;
249                 Ctx->kill_me = 1;
250         }
251 }
252
253
254 void StopClientWatchers(AsyncIO *IO)
255 {
256         ev_timer_stop (event_base, &IO->rw_timeout);
257         ev_timer_stop(event_base, &IO->conn_fail);
258         ev_idle_stop(event_base, &IO->unwind_stack);
259
260         ev_io_stop(event_base, &IO->conn_event);
261         ev_io_stop(event_base, &IO->send_event);
262         ev_io_stop(event_base, &IO->recv_event);
263
264         if (IO->SendBuf.fd != 0) {
265                 close(IO->SendBuf.fd);
266         }
267         IO->SendBuf.fd = 0;
268         IO->RecvBuf.fd = 0;
269 }
270
271 void ShutDownCLient(AsyncIO *IO)
272 {
273         CitContext *Ctx =IO->CitContext;
274         become_session(Ctx);
275
276         EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
277
278         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
279         StopClientWatchers(IO);
280
281         if (IO->DNS.Channel != NULL) {
282                 ares_destroy(IO->DNS.Channel);
283                 EV_DNS_LOG_STOP(DNS.recv_event);
284                 EV_DNS_LOG_STOP(DNS.send_event);
285                 ev_io_stop(event_base, &IO->DNS.recv_event);
286                 ev_io_stop(event_base, &IO->DNS.send_event);
287                 IO->DNS.Channel = NULL;
288         }
289         assert(IO->Terminate);
290         IO->Terminate(IO);
291 }
292
293 eReadState HandleInbound(AsyncIO *IO)
294 {
295         const char *Err = NULL;
296         eReadState Finished = eBufferNotEmpty;
297
298         become_session(IO->CitContext);
299
300         while ((Finished == eBufferNotEmpty) &&
301                ((IO->NextState == eReadMessage)||
302                 (IO->NextState == eReadMore)||
303                 (IO->NextState == eReadFile)||
304                 (IO->NextState == eReadPayload)))
305         {
306                 /* Reading lines...
307                  * lex line reply in callback,
308                  * or do it ourselves.
309                  * i.e. as nnn-blabla means continue reading in SMTP
310                  */
311                 if ((IO->NextState == eReadFile) &&
312                     (Finished == eBufferNotEmpty))
313                 {
314                         Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
315                         if (Finished == eReadSuccess)
316                         {
317                                 IO->NextState = eSendReply;
318                         }
319                 }
320                 else if (IO->LineReader)
321                         Finished = IO->LineReader(IO);
322                 else
323                         Finished = StrBufChunkSipLine(IO->IOBuf,
324                                                       &IO->RecvBuf);
325
326                 switch (Finished) {
327                 case eMustReadMore: /// read new from socket...
328                         break;
329                 case eBufferNotEmpty: /* shouldn't happen... */
330                 case eReadSuccess: /// done for now...
331                         break;
332                 case eReadFail: /// WHUT?
333                                 ///todo: shut down!
334                         break;
335                 }
336
337                 if (Finished != eMustReadMore) {
338                         assert(IO->ReadDone);
339                         ev_io_stop(event_base, &IO->recv_event);
340                         IO->NextState = IO->ReadDone(IO);
341                         Finished = StrBufCheckBuffer(&IO->RecvBuf);
342                 }
343         }
344
345         switch (IO->NextState) {
346         case eSendFile:
347                 ev_io_start(event_base, &IO->send_event);
348                 break;
349         case eSendReply:
350         case eSendMore:
351                 assert(IO->SendDone);
352                 IO->NextState = IO->SendDone(IO);
353                 ev_io_start(event_base, &IO->send_event);
354                 break;
355         case eReadPayload:
356         case eReadMore:
357         case eReadFile:
358                 ev_io_start(event_base, &IO->recv_event);
359                 break;
360         case eTerminateConnection:
361                 ShutDownCLient(IO);
362                 break;
363         case eAbort:
364                 ShutDownCLient(IO);
365                 break;
366         case eSendDNSQuery:
367         case eReadDNSReply:
368         case eDBQuery:
369         case eConnect:
370         case eReadMessage:
371                 break;
372         }
373         return Finished;
374 }
375
376
377 static void
378 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
379 {
380         int rc;
381         AsyncIO *IO = watcher->data;
382         const char *errmsg = NULL;
383
384         become_session(IO->CitContext);
385 #ifdef BIGBAD_IODBG
386         {
387                 int rv = 0;
388                 char fn [SIZ];
389                 FILE *fd;
390                 const char *pch = ChrPtr(IO->SendBuf.Buf);
391                 const char *pchh = IO->SendBuf.ReadWritePointer;
392                 long nbytes;
393
394                 if (pchh == NULL)
395                         pchh = pch;
396
397                 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
398                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
399                          ((CitContext*)(IO->CitContext))->ServiceName,
400                          IO->SendBuf.fd);
401
402                 fd = fopen(fn, "a+");
403                 fprintf(fd, "Send: BufSize: %ld BufContent: [",
404                         nbytes);
405                 rv = fwrite(pchh, nbytes, 1, fd);
406                 if (!rv) printf("failed to write debug to %s!\n", fn);
407                 fprintf(fd, "]\n");
408 #endif
409                 switch (IO->NextState) {
410                 case eSendFile:
411                         rc = FileSendChunked(&IO->IOB, &errmsg);
412                         if (rc < 0)
413                                 StrBufPlain(IO->ErrMsg, errmsg, -1);
414                         break;
415                 default:
416                         rc = StrBuf_write_one_chunk_callback(watcher->fd,
417                                                              0/*TODO*/,
418                                                              &IO->SendBuf);
419                 }
420
421 #ifdef BIGBAD_IODBG
422                 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
423                 fclose(fd);
424         }
425 #endif
426         if (rc == 0)
427         {
428                 ev_io_stop(event_base, &IO->send_event);
429                 switch (IO->NextState) {
430                 case eSendMore:
431                         assert(IO->SendDone);
432                         IO->NextState = IO->SendDone(IO);
433
434                         if ((IO->NextState == eTerminateConnection) ||
435                             (IO->NextState == eAbort) )
436                                 ShutDownCLient(IO);
437                         else {
438                                 ev_io_start(event_base, &IO->send_event);
439                         }
440                         break;
441                 case eSendFile:
442                         if (IO->IOB.ChunkSendRemain > 0) {
443                                 ev_io_start(event_base, &IO->recv_event);
444                         } else {
445                                 assert(IO->ReadDone);
446                                 IO->NextState = IO->ReadDone(IO);
447                                 switch(IO->NextState) {
448                                 case eSendDNSQuery:
449                                 case eReadDNSReply:
450                                 case eDBQuery:
451                                 case eConnect:
452                                         break;
453                                 case eSendReply:
454                                 case eSendMore:
455                                 case eSendFile:
456                                         ev_io_start(event_base,
457                                                     &IO->send_event);
458                                         break;
459                                 case eReadMessage:
460                                 case eReadMore:
461                                 case eReadPayload:
462                                 case eReadFile:
463                                         break;
464                                 case eTerminateConnection:
465                                 case eAbort:
466                                         break;
467                                 }
468                         }
469                         break;
470                 case eSendReply:
471                     if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
472                         break;
473                     IO->NextState = eReadMore;
474                 case eReadMore:
475                 case eReadMessage:
476                 case eReadPayload:
477                 case eReadFile:
478                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
479                         {
480                                 HandleInbound(IO);
481                         }
482                         else {
483                                 ev_io_start(event_base, &IO->recv_event);
484                         }
485
486                         break;
487                 case eDBQuery:
488                         /*
489                          * we now live in another queue,
490                          * so we have to unregister.
491                          */
492                         ev_cleanup_stop(loop, &IO->abort_by_shutdown);
493                         break;
494                 case eSendDNSQuery:
495                 case eReadDNSReply:
496                 case eConnect:
497                 case eTerminateConnection:
498                 case eAbort:
499                         break;
500                 }
501         }
502         else if (rc < 0) {
503                 assert(IO->Timeout);
504                 IO->Timeout(IO);
505         }
506         /* else : must write more. */
507 }
508 static void
509 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
510 {
511         switch(IO->NextState) {
512         case eReadMore:
513         case eReadMessage:
514         case eReadFile:
515                 ev_io_start(event_base, &IO->recv_event);
516                 break;
517         case eSendReply:
518         case eSendMore:
519         case eReadPayload:
520         case eSendFile:
521                 become_session(IO->CitContext);
522                 IO_send_callback(loop, &IO->send_event, revents);
523                 break;
524         case eDBQuery:
525         case eSendDNSQuery:
526         case eReadDNSReply:
527         case eConnect:
528         case eTerminateConnection:
529         case eAbort:
530                 /// TODO: WHUT?
531                 break;
532         }
533 }
534
535 static void
536 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
537 {
538         AsyncIO *IO = watcher->data;
539
540         ev_timer_stop (event_base, &IO->rw_timeout);
541         become_session(IO->CitContext);
542
543         if (IO->SendBuf.fd != 0)
544         {
545                 ev_io_stop(event_base, &IO->send_event);
546                 ev_io_stop(event_base, &IO->recv_event);
547                 ev_timer_stop (event_base, &IO->rw_timeout);
548                 close(IO->SendBuf.fd);
549                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
550         }
551
552         assert(IO->Timeout);
553         switch (IO->Timeout(IO))
554         {
555         case eAbort:
556                 ShutDownCLient(IO);
557         default:
558                 break;
559         }
560 }
561
562 static void
563 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
564 {
565         AsyncIO *IO = watcher->data;
566
567         ev_timer_stop (event_base, &IO->conn_fail);
568
569         if (IO->SendBuf.fd != 0)
570         {
571                 ev_io_stop(loop, &IO->conn_event);
572                 ev_io_stop(event_base, &IO->send_event);
573                 ev_io_stop(event_base, &IO->recv_event);
574                 ev_timer_stop (event_base, &IO->rw_timeout);
575                 close(IO->SendBuf.fd);
576                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
577         }
578         become_session(IO->CitContext);
579
580         assert(IO->ConnFail);
581         switch (IO->ConnFail(IO))
582         {
583         case eAbort:
584                 ShutDownCLient(IO);
585         default:
586                 break;
587
588         }
589 }
590
591 static void
592 IO_connfailimmediate_callback(struct ev_loop *loop,
593                               ev_idle *watcher,
594                               int revents)
595 {
596         AsyncIO *IO = watcher->data;
597
598         ev_idle_stop (event_base, &IO->conn_fail_immediate);
599
600         if (IO->SendBuf.fd != 0)
601         {
602                 close(IO->SendBuf.fd);
603                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
604         }
605         become_session(IO->CitContext);
606
607         assert(IO->ConnFail);
608         switch (IO->ConnFail(IO))
609         {
610         case eAbort:
611                 ShutDownCLient(IO);
612         default:
613                 break;
614
615         }
616 }
617
618 static void
619 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
620 {
621         AsyncIO *IO = watcher->data;
622
623         ev_io_stop(loop, &IO->conn_event);
624         ev_timer_stop (event_base, &IO->conn_fail);
625         set_start_callback(loop, IO, revents);
626 }
627 static void
628 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
629 {
630         const char *errmsg;
631         ssize_t nbytes;
632         AsyncIO *IO = watcher->data;
633
634         switch (IO->NextState) {
635         case eReadFile:
636                 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
637                 if (nbytes < 0)
638                         StrBufPlain(IO->ErrMsg, errmsg, -1);
639                 else
640                 {
641                         if (IO->IOB.ChunkSendRemain == 0)
642                         {
643                                 IO->NextState = eSendReply;
644                         }
645                         else
646                                 return;
647                 }
648                 break;
649         default:
650                 nbytes = StrBuf_read_one_chunk_callback(watcher->fd,
651                                                         0 /*TODO */,
652                                                         &IO->RecvBuf);
653                 break;
654         }
655
656 #ifdef BIGBAD_IODBG
657         {
658                 long nbytes;
659                 int rv = 0;
660                 char fn [SIZ];
661                 FILE *fd;
662                 const char *pch = ChrPtr(IO->RecvBuf.Buf);
663                 const char *pchh = IO->RecvBuf.ReadWritePointer;
664
665                 if (pchh == NULL)
666                         pchh = pch;
667
668                 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
669                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
670                          ((CitContext*)(IO->CitContext))->ServiceName,
671                          IO->SendBuf.fd);
672
673                 fd = fopen(fn, "a+");
674                 fprintf(fd, "Read: BufSize: %ld BufContent: [",
675                         nbytes);
676                 rv = fwrite(pchh, nbytes, 1, fd);
677                 if (!rv) printf("failed to write debug to %s!\n", fn);
678                 fprintf(fd, "]\n");
679                 fclose(fd);
680         }
681 #endif
682         if (nbytes > 0) {
683                 HandleInbound(IO);
684         } else if (nbytes == 0) {
685                 assert(IO->Timeout);
686
687                 switch (IO->Timeout(IO))
688                 {
689                 case eAbort:
690                         ShutDownCLient(IO);
691                 default:
692                         break;
693                 }
694                 return;
695         } else if (nbytes == -1) {
696                 // FD is gone. kick it. 
697                 StopClientWatchers(IO);
698                 EV_syslog(LOG_DEBUG,
699                           "EVENT: Socket Invalid! %s \n",
700                           strerror(errno));
701                 ShutDownCLient(IO);
702                 return;
703         }
704 }
705
706 void
707 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
708 {
709         AsyncIO *IO = watcher->data;
710         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
711         become_session(IO->CitContext);
712         assert(IO->DNS.Query->PostDNS);
713         switch (IO->DNS.Query->PostDNS(IO))
714         {
715         case eAbort:
716                 assert(IO->DNS.Fail);
717                 switch (IO->DNS.Fail(IO)) {
718                 case eAbort:
719 ////                    StopClientWatchers(IO);
720                         ShutDownCLient(IO);
721                 default:
722                         break;
723                 }
724         default:
725                 break;
726         }
727 }
728
729
730 eNextState EvConnectSock(AsyncIO *IO,
731                          double conn_timeout,
732                          double first_rw_timeout,
733                          int ReadFirst)
734 {
735         struct sockaddr_in egress_sin;
736         int fdflags;
737         int rc = -1;
738
739         become_session(IO->CitContext);
740
741         if (ReadFirst) {
742                 IO->NextState = eReadMessage;
743         }
744         else {
745                 IO->NextState = eSendReply;
746         }
747
748         IO->SendBuf.fd = IO->RecvBuf.fd =
749                 socket(
750                         (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
751                         SOCK_STREAM,
752                         IPPROTO_TCP);
753
754         if (IO->SendBuf.fd < 0) {
755                 EV_syslog(LOG_ERR,
756                           "EVENT: socket() failed: %s\n",
757                           strerror(errno));
758
759                 StrBufPrintf(IO->ErrMsg,
760                              "Failed to create socket: %s",
761                              strerror(errno));
762                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
763                 return eAbort;
764         }
765         fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
766         if (fdflags < 0) {
767                 EV_syslog(LOG_DEBUG,
768                           "EVENT: unable to get socket flags! %s \n",
769                           strerror(errno));
770                 StrBufPrintf(IO->ErrMsg,
771                              "Failed to get socket flags: %s",
772                              strerror(errno));
773                 close(IO->SendBuf.fd);
774                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
775                 return eAbort;
776         }
777         fdflags = fdflags | O_NONBLOCK;
778         if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
779                 EV_syslog(
780                         LOG_DEBUG,
781                         "EVENT: unable to set socket nonblocking flags! %s \n",
782                         strerror(errno));
783                 StrBufPrintf(IO->ErrMsg,
784                              "Failed to set socket flags: %s",
785                              strerror(errno));
786                 close(IO->SendBuf.fd);
787                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
788                 return eAbort;
789         }
790 /* TODO: maye we could use offsetof() to calc the position of data...
791  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
792  */
793         ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
794         IO->recv_event.data = IO;
795         ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
796         IO->send_event.data = IO;
797
798         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
799         IO->conn_fail.data = IO;
800         ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
801         IO->rw_timeout.data = IO;
802
803
804
805
806         /* for debugging you may bypass it like this:
807          * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
808          * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
809          *   inet_addr("127.0.0.1");
810          */
811         if (IO->ConnectMe->IPv6) {
812                 rc = connect(IO->SendBuf.fd,
813                              &IO->ConnectMe->Addr,
814                              sizeof(struct sockaddr_in6));
815         }
816         else {
817                 /* If citserver is bound to a specific IP address on the host, make
818                  * sure we use that address for outbound connections.
819                  */
820         
821                 memset(&egress_sin, 0, sizeof(egress_sin));
822                 egress_sin.sin_family = AF_INET;
823                 if (!IsEmptyStr(config.c_ip_addr)) {
824                         egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
825                         if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
826                                 egress_sin.sin_addr.s_addr = INADDR_ANY;
827                         }
828
829                         /* If this bind fails, no problem; we can still use INADDR_ANY */
830                         bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
831                 }
832                 rc = connect(IO->SendBuf.fd,
833                              (struct sockaddr_in *)&IO->ConnectMe->Addr,
834                              sizeof(struct sockaddr_in));
835         }
836
837         if (rc >= 0){
838                 EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
839                 set_start_callback(event_base, IO, 0);
840                 ev_timer_start(event_base, &IO->rw_timeout);
841                 return IO->NextState;
842         }
843         else if (errno == EINPROGRESS) {
844                 EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
845
846                 ev_io_init(&IO->conn_event,
847                            IO_connestd_callback,
848                            IO->SendBuf.fd,
849                            EV_READ|EV_WRITE);
850
851                 IO->conn_event.data = IO;
852
853                 ev_io_start(event_base, &IO->conn_event);
854                 ev_timer_start(event_base, &IO->conn_fail);
855                 return IO->NextState;
856         }
857         else {
858                 ev_idle_init(&IO->conn_fail_immediate,
859                              IO_connfailimmediate_callback);
860                 IO->conn_fail_immediate.data = IO;
861                 ev_idle_start(event_base, &IO->conn_fail_immediate);
862
863                 EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
864                 StrBufPrintf(IO->ErrMsg,
865                              "Failed to connect: %s",
866                              strerror(errno));
867                 return IO->NextState;
868         }
869         return IO->NextState;
870 }
871
872 void SetNextTimeout(AsyncIO *IO, double timeout)
873 {
874         IO->rw_timeout.repeat = timeout;
875         ev_timer_again (event_base,  &IO->rw_timeout);
876 }
877
878
879 eNextState ReAttachIO(AsyncIO *IO,
880                       void *pData,
881                       int ReadFirst)
882 {
883         IO->Data = pData;
884         become_session(IO->CitContext);
885         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
886         if (ReadFirst) {
887                 IO->NextState = eReadMessage;
888         }
889         else {
890                 IO->NextState = eSendReply;
891         }
892         set_start_callback(event_base, IO, 0);
893
894         return IO->NextState;
895 }
896
897 void InitIOStruct(AsyncIO *IO,
898                   void *Data,
899                   eNextState NextState,
900                   IO_LineReaderCallback LineReader,
901                   IO_CallBack DNS_Fail,
902                   IO_CallBack SendDone,
903                   IO_CallBack ReadDone,
904                   IO_CallBack Terminate,
905                   IO_CallBack ConnFail,
906                   IO_CallBack Timeout,
907                   IO_CallBack ShutdownAbort)
908 {
909         IO->Data          = Data;
910
911         IO->CitContext    = CloneContext(CC);
912         ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
913
914         IO->NextState     = NextState;
915
916         IO->SendDone      = SendDone;
917         IO->ReadDone      = ReadDone;
918         IO->Terminate     = Terminate;
919         IO->LineReader    = LineReader;
920         IO->ConnFail      = ConnFail;
921         IO->Timeout       = Timeout;
922         IO->ShutdownAbort = ShutdownAbort;
923
924         IO->DNS.Fail      = DNS_Fail;
925
926         IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
927         IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
928         IO->IOBuf         = NewStrBuf();
929         EV_syslog(LOG_DEBUG,
930                   "EVENT: Session lives at %p IO at %p \n",
931                   Data, IO);
932
933 }
934
935 extern int evcurl_init(AsyncIO *IO);
936
937 int InitcURLIOStruct(AsyncIO *IO,
938                      void *Data,
939                      const char* Desc,
940                      IO_CallBack SendDone,
941                      IO_CallBack Terminate,
942                      IO_CallBack ShutdownAbort)
943 {
944         IO->Data          = Data;
945
946         IO->CitContext    = CloneContext(CC);
947         ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
948
949         IO->SendDone = SendDone;
950         IO->Terminate = Terminate;
951         IO->ShutdownAbort = ShutdownAbort;
952
953         strcpy(IO->HttpReq.errdesc, Desc);
954
955
956         return  evcurl_init(IO);
957
958 }
959
960 void EV_backtrace(AsyncIO *IO)
961 {
962 #ifdef HAVE_BACKTRACE
963         void *stack_frames[50];
964         size_t size, i;
965         char **strings;
966
967
968         size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
969         strings = backtrace_symbols(stack_frames, size);
970         for (i = 0; i < size; i++) {
971                 if (strings != NULL)
972                         EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
973                 else
974                         EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
975         }
976         free(strings);
977 #endif
978 }
979
980
981 ev_tstamp ctdl_ev_now (void)
982 {
983         return ev_now(event_base);
984 }