Style cleanup
[citadel.git] / citadel / event_client.c
1 /*
2  *
3  * Copyright (c) 1998-2012 by the citadel.org team
4  *
5  *  This program is open source software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 #include "sysdep.h"
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <termios.h>
25 #include <fcntl.h>
26 #include <signal.h>
27 #include <pwd.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <syslog.h>
31
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
34 # include <time.h>
35 #else
36 # if HAVE_SYS_TIME_H
37 #  include <sys/time.h>
38 # else
39 #  include <time.h>
40 # endif
41 #endif
42 #include <sys/wait.h>
43 #include <ctype.h>
44 #include <string.h>
45 #include <limits.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <assert.h>
50
51 #include <libcitadel.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56 #include "config.h"
57 #include "control.h"
58 #include "user_ops.h"
59 #include "database.h"
60 #include "msgbase.h"
61 #include "internet_addressing.h"
62 #include "genstamp.h"
63 #include "domain.h"
64 #include "clientsocket.h"
65 #include "locate_host.h"
66 #include "citadel_dirs.h"
67
68 #include "event_client.h"
69
70 static void IO_abort_shutdown_callback(struct ev_loop *loop,
71                                        ev_cleanup *watcher,
72                                        int revents)
73 {
74         AsyncIO *IO = watcher->data;
75         EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
76
77         assert(IO->ShutdownAbort);
78         IO->ShutdownAbort(IO);
79 }
80
81
82 /*------------------------------------------------------------------------------
83  *                              Server DB IO
84  *----------------------------------------------------------------------------*/
85 extern int evdb_count;
86 extern pthread_mutex_t DBEventQueueMutex;
87 extern HashList *DBInboundEventQueue;
88 extern struct ev_loop *event_db;
89 extern ev_async DBAddJob;
90 extern ev_async DBExitEventLoop;
91
92 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
93 {
94         IOAddHandler *h;
95         int i;
96
97         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
98         h->IO = IO;
99         h->EvAttch = CB;
100         ev_cleanup_init(&IO->db_abort_by_shutdown,
101                         IO_abort_shutdown_callback);
102         IO->db_abort_by_shutdown.data = IO;
103         ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
104
105         pthread_mutex_lock(&DBEventQueueMutex);
106         EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
107         i = ++evdb_count ;
108         Put(DBInboundEventQueue, IKEY(i), h, NULL);
109         pthread_mutex_unlock(&DBEventQueueMutex);
110
111         ev_async_send (event_db, &DBAddJob);
112         EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
113         return eDBQuery;
114 }
115
116 void ShutDownDBCLient(AsyncIO *IO)
117 {
118         CitContext *Ctx =IO->CitContext;
119         become_session(Ctx);
120
121         EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
122         ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
123
124         assert(IO->Terminate);
125         IO->Terminate(IO);
126 }
127
128 void
129 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
130 {
131         AsyncIO *IO = watcher->data;
132         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
133         become_session(IO->CitContext);
134
135         ev_idle_stop(event_db, &IO->db_unwind_stack);
136
137         assert(IO->NextDBOperation);
138         switch (IO->NextDBOperation(IO))
139         {
140         case eDBQuery:
141                 break;
142         case eSendDNSQuery:
143         case eReadDNSReply:
144         case eConnect:
145         case eSendReply:
146         case eSendMore:
147         case eSendFile:
148         case eReadMessage:
149         case eReadMore:
150         case eReadPayload:
151         case eReadFile:
152                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
153                 break;
154         case eTerminateConnection:
155         case eAbort:
156                 ev_idle_stop(event_db, &IO->db_unwind_stack);
157                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
158                 ShutDownDBCLient(IO);
159         }
160 }
161
162 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
163 {
164         IO->NextDBOperation = CB;
165         ev_idle_init(&IO->db_unwind_stack,
166                      DB_PerformNext);
167         IO->db_unwind_stack.data = IO;
168         ev_idle_start(event_db, &IO->db_unwind_stack);
169         return eDBQuery;
170 }
171
172 /*------------------------------------------------------------------------------
173  *                      Client IO
174  *----------------------------------------------------------------------------*/
175 extern int evbase_count;
176 extern pthread_mutex_t EventQueueMutex;
177 extern HashList *InboundEventQueue;
178 extern struct ev_loop *event_base;
179 extern ev_async AddJob;
180 extern ev_async ExitEventLoop;
181
182
183 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
184 {
185         IOAddHandler *h;
186         int i;
187
188         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
189         h->IO = IO;
190         h->EvAttch = CB;
191         ev_cleanup_init(&IO->abort_by_shutdown,
192                         IO_abort_shutdown_callback);
193         IO->abort_by_shutdown.data = IO;
194         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
195
196         pthread_mutex_lock(&EventQueueMutex);
197         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
198         i = ++evbase_count;
199         Put(InboundEventQueue, IKEY(i), h, NULL);
200         pthread_mutex_unlock(&EventQueueMutex);
201
202         ev_async_send (event_base, &AddJob);
203         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
204         return eSendReply;
205 }
206
207 extern eNextState evcurl_handle_start(AsyncIO *IO);
208
209 eNextState QueueCurlContext(AsyncIO *IO)
210 {
211         IOAddHandler *h;
212         int i;
213
214         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
215         h->IO = IO;
216         h->EvAttch = evcurl_handle_start;
217
218         pthread_mutex_lock(&EventQueueMutex);
219         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
220         i = ++evbase_count;
221         Put(InboundEventQueue, IKEY(i), h, NULL);
222         pthread_mutex_unlock(&EventQueueMutex);
223
224         ev_async_send (event_base, &AddJob);
225         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
226         return eSendReply;
227 }
228
229 void DestructCAres(AsyncIO *IO);
230 void FreeAsyncIOContents(AsyncIO *IO)
231 {
232         CitContext *Ctx = IO->CitContext;
233
234         FreeStrBuf(&IO->IOBuf);
235         FreeStrBuf(&IO->SendBuf.Buf);
236         FreeStrBuf(&IO->RecvBuf.Buf);
237
238         DestructCAres(IO);
239
240         FreeURL(&IO->ConnectMe);
241         FreeStrBuf(&IO->HttpReq.ReplyData);
242
243         Ctx->state = CON_IDLE;
244         Ctx->kill_me = 1;
245 }
246
247
248 void StopClientWatchers(AsyncIO *IO)
249 {
250         ev_timer_stop (event_base, &IO->rw_timeout);
251         ev_timer_stop(event_base, &IO->conn_fail);
252         ev_idle_stop(event_base, &IO->unwind_stack);
253
254         ev_io_stop(event_base, &IO->conn_event);
255         ev_io_stop(event_base, &IO->send_event);
256         ev_io_stop(event_base, &IO->recv_event);
257         close(IO->SendBuf.fd);
258         IO->SendBuf.fd = 0;
259         IO->RecvBuf.fd = 0;
260 }
261
262 void ShutDownCLient(AsyncIO *IO)
263 {
264         CitContext *Ctx =IO->CitContext;
265         become_session(Ctx);
266
267         EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
268
269         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
270         StopClientWatchers(IO);
271
272         if (IO->DNS.Channel != NULL) {
273                 ares_destroy(IO->DNS.Channel);
274                 ev_io_stop(event_base, &IO->DNS.recv_event);
275                 ev_io_stop(event_base, &IO->DNS.send_event);
276                 IO->DNS.Channel = NULL;
277         }
278         assert(IO->Terminate);
279         IO->Terminate(IO);
280 }
281
282
283 eReadState HandleInbound(AsyncIO *IO)
284 {
285         const char *Err = NULL;
286         eReadState Finished = eBufferNotEmpty;
287
288         become_session(IO->CitContext);
289
290         while ((Finished == eBufferNotEmpty) &&
291                ((IO->NextState == eReadMessage)||
292                 (IO->NextState == eReadMore)||
293                 (IO->NextState == eReadFile)||
294                 (IO->NextState == eReadPayload)))
295         {
296                 /* Reading lines...
297                  * lex line reply in callback,
298                  * or do it ourselves.
299                  * i.e. as nnn-blabla means continue reading in SMTP
300                  */
301                 if ((IO->NextState == eReadFile) &&
302                     (Finished == eBufferNotEmpty))
303                 {
304                         Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
305                         if (Finished == eReadSuccess)
306                         {
307                                 IO->NextState = eSendReply;
308                         }
309                 }
310                 else if (IO->LineReader)
311                         Finished = IO->LineReader(IO);
312                 else
313                         Finished = StrBufChunkSipLine(IO->IOBuf,
314                                                       &IO->RecvBuf);
315
316                 switch (Finished) {
317                 case eMustReadMore: /// read new from socket...
318                         break;
319                 case eBufferNotEmpty: /* shouldn't happen... */
320                 case eReadSuccess: /// done for now...
321                         break;
322                 case eReadFail: /// WHUT?
323                                 ///todo: shut down!
324                         break;
325                 }
326
327                 if (Finished != eMustReadMore) {
328                         assert(IO->ReadDone);
329                         ev_io_stop(event_base, &IO->recv_event);
330                         IO->NextState = IO->ReadDone(IO);
331                         Finished = StrBufCheckBuffer(&IO->RecvBuf);
332                 }
333         }
334
335         switch (IO->NextState) {
336         case eSendFile:
337                 ev_io_start(event_base, &IO->send_event);
338                 break;
339         case eSendReply:
340         case eSendMore:
341                 assert(IO->SendDone);
342                 IO->NextState = IO->SendDone(IO);
343                 ev_io_start(event_base, &IO->send_event);
344                 break;
345         case eReadPayload:
346         case eReadMore:
347         case eReadFile:
348                 ev_io_start(event_base, &IO->recv_event);
349                 break;
350         case eTerminateConnection:
351 //////TODOxxxx
352                 break;
353         case eAbort:
354                 ShutDownCLient(IO);
355                 break;
356         case eSendDNSQuery:
357         case eReadDNSReply:
358         case eDBQuery:
359         case eConnect:
360         case eReadMessage:
361                 break;
362         }
363         return Finished;
364 }
365
366
367 static void
368 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
369 {
370         int rc;
371         AsyncIO *IO = watcher->data;
372         const char *errmsg = NULL;
373
374         become_session(IO->CitContext);
375 #ifdef BIGBAD_IODBG
376         {
377                 int rv = 0;
378                 char fn [SIZ];
379                 FILE *fd;
380                 const char *pch = ChrPtr(IO->SendBuf.Buf);
381                 const char *pchh = IO->SendBuf.ReadWritePointer;
382                 long nbytes;
383
384                 if (pchh == NULL)
385                         pchh = pch;
386
387                 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
388                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
389                          ((CitContext*)(IO->CitContext))->ServiceName,
390                          IO->SendBuf.fd);
391
392                 fd = fopen(fn, "a+");
393                 fprintf(fd, "Send: BufSize: %ld BufContent: [",
394                         nbytes);
395                 rv = fwrite(pchh, nbytes, 1, fd);
396                 if (!rv) printf("failed to write debug to %s!\n", fn);
397                 fprintf(fd, "]\n");
398 #endif
399                 switch (IO->NextState) {
400                 case eSendFile:
401                         rc = FileSendChunked(&IO->IOB, &errmsg);
402                         if (rc < 0)
403                                 StrBufPlain(IO->ErrMsg, errmsg, -1);
404                         break;
405                 default:
406                         rc = StrBuf_write_one_chunk_callback(watcher->fd,
407                                                              0/*TODO*/,
408                                                              &IO->SendBuf);
409                 }
410
411 #ifdef BIGBAD_IODBG
412                 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
413                 fclose(fd);
414         }
415 #endif
416         if (rc == 0)
417         {
418                 ev_io_stop(event_base, &IO->send_event);
419                 switch (IO->NextState) {
420                 case eSendMore:
421                         assert(IO->SendDone);
422                         IO->NextState = IO->SendDone(IO);
423
424                         if ((IO->NextState == eTerminateConnection) ||
425                             (IO->NextState == eAbort) )
426                                 ShutDownCLient(IO);
427                         else {
428                                 ev_io_start(event_base, &IO->send_event);
429                         }
430                         break;
431                 case eSendFile:
432                         if (IO->IOB.ChunkSendRemain > 0) {
433                                 ev_io_start(event_base, &IO->recv_event);
434                         } else {
435                                 assert(IO->ReadDone);
436                                 IO->NextState = IO->ReadDone(IO);
437                                 switch(IO->NextState) {
438                                 case eSendDNSQuery:
439                                 case eReadDNSReply:
440                                 case eDBQuery:
441                                 case eConnect:
442                                         break;
443                                 case eSendReply:
444                                 case eSendMore:
445                                 case eSendFile:
446                                         ev_io_start(event_base,
447                                                     &IO->send_event);
448                                         break;
449                                 case eReadMessage:
450                                 case eReadMore:
451                                 case eReadPayload:
452                                 case eReadFile:
453                                         break;
454                                 case eTerminateConnection:
455                                 case eAbort:
456                                         break;
457                                 }
458                         }
459                         break;
460                 case eSendReply:
461                     if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
462                         break;
463                     IO->NextState = eReadMore;
464                 case eReadMore:
465                 case eReadMessage:
466                 case eReadPayload:
467                 case eReadFile:
468                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
469                         {
470                                 HandleInbound(IO);
471                         }
472                         else {
473                                 ev_io_start(event_base, &IO->recv_event);
474                         }
475
476                         break;
477                 case eDBQuery:
478                         /*
479                          * we now live in another queue,
480                          * so we have to unregister.
481                          */
482                         ev_cleanup_stop(loop, &IO->abort_by_shutdown);
483                         break;
484                 case eSendDNSQuery:
485                 case eReadDNSReply:
486                 case eConnect:
487                 case eTerminateConnection:
488                 case eAbort:
489                         break;
490                 }
491         }
492         else if (rc < 0) {
493                 assert(IO->Timeout);
494                 IO->Timeout(IO);
495         }
496         /* else : must write more. */
497 }
498 static void
499 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
500 {
501         switch(IO->NextState) {
502         case eReadMore:
503         case eReadMessage:
504         case eReadFile:
505                 ev_io_start(event_base, &IO->recv_event);
506                 break;
507         case eSendReply:
508         case eSendMore:
509         case eReadPayload:
510         case eSendFile:
511                 become_session(IO->CitContext);
512                 IO_send_callback(loop, &IO->send_event, revents);
513                 break;
514         case eDBQuery:
515         case eSendDNSQuery:
516         case eReadDNSReply:
517         case eConnect:
518         case eTerminateConnection:
519         case eAbort:
520                 /// TODO: WHUT?
521                 break;
522         }
523 }
524
525 static void
526 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
527 {
528         AsyncIO *IO = watcher->data;
529
530         ev_timer_stop (event_base, &IO->rw_timeout);
531         become_session(IO->CitContext);
532
533         if (IO->SendBuf.fd != 0)
534         {
535                 ev_io_stop(event_base, &IO->send_event);
536                 ev_io_stop(event_base, &IO->recv_event);
537                 ev_timer_stop (event_base, &IO->rw_timeout);
538                 close(IO->SendBuf.fd);
539                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
540         }
541
542         assert(IO->Timeout);
543         switch (IO->Timeout(IO))
544         {
545         case eAbort:
546                 ShutDownCLient(IO);
547         default:
548                 break;
549         }
550 }
551
552 static void
553 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
554 {
555         AsyncIO *IO = watcher->data;
556
557         ev_timer_stop (event_base, &IO->conn_fail);
558
559         if (IO->SendBuf.fd != 0)
560         {
561                 ev_io_stop(loop, &IO->conn_event);
562                 ev_io_stop(event_base, &IO->send_event);
563                 ev_io_stop(event_base, &IO->recv_event);
564                 ev_timer_stop (event_base, &IO->rw_timeout);
565                 close(IO->SendBuf.fd);
566                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
567         }
568         become_session(IO->CitContext);
569
570         assert(IO->ConnFail);
571         switch (IO->ConnFail(IO))
572         {
573         case eAbort:
574                 ShutDownCLient(IO);
575         default:
576                 break;
577
578         }
579 }
580
581 static void
582 IO_connfailimmediate_callback(struct ev_loop *loop,
583                               ev_idle *watcher,
584                               int revents)
585 {
586         AsyncIO *IO = watcher->data;
587
588         ev_idle_stop (event_base, &IO->conn_fail_immediate);
589
590         if (IO->SendBuf.fd != 0)
591         {
592                 close(IO->SendBuf.fd);
593                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
594         }
595         become_session(IO->CitContext);
596
597         assert(IO->ConnFail);
598         switch (IO->ConnFail(IO))
599         {
600         case eAbort:
601                 ShutDownCLient(IO);
602         default:
603                 break;
604
605         }
606 }
607
608 static void
609 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
610 {
611         AsyncIO *IO = watcher->data;
612
613         ev_io_stop(loop, &IO->conn_event);
614         ev_timer_stop (event_base, &IO->conn_fail);
615         set_start_callback(loop, IO, revents);
616 }
617 static void
618 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
619 {
620         const char *errmsg;
621         ssize_t nbytes;
622         AsyncIO *IO = watcher->data;
623
624         switch (IO->NextState) {
625         case eReadFile:
626                 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
627                 if (nbytes < 0)
628                         StrBufPlain(IO->ErrMsg, errmsg, -1);
629                 else
630                 {
631                         if (IO->IOB.ChunkSendRemain == 0)
632                         {
633                                 IO->NextState = eSendReply;
634                         }
635                         else
636                                 return;
637                 }
638                 break;
639         default:
640                 nbytes = StrBuf_read_one_chunk_callback(watcher->fd,
641                                                         0 /*TODO */,
642                                                         &IO->RecvBuf);
643                 break;
644         }
645
646 #ifdef BIGBAD_IODBG
647         {
648                 long nbytes;
649                 int rv = 0;
650                 char fn [SIZ];
651                 FILE *fd;
652                 const char *pch = ChrPtr(IO->RecvBuf.Buf);
653                 const char *pchh = IO->RecvBuf.ReadWritePointer;
654
655                 if (pchh == NULL)
656                         pchh = pch;
657
658                 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
659                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
660                          ((CitContext*)(IO->CitContext))->ServiceName,
661                          IO->SendBuf.fd);
662
663                 fd = fopen(fn, "a+");
664                 fprintf(fd, "Read: BufSize: %ld BufContent: [",
665                         nbytes);
666                 rv = fwrite(pchh, nbytes, 1, fd);
667                 if (!rv) printf("failed to write debug to %s!\n", fn);
668                 fprintf(fd, "]\n");
669                 fclose(fd);
670         }
671 #endif
672         if (nbytes > 0) {
673                 HandleInbound(IO);
674         } else if (nbytes == 0) {
675                 assert(IO->Timeout);
676
677                 switch (IO->Timeout(IO))
678                 {
679                 case eAbort:
680                         ShutDownCLient(IO);
681                 default:
682                         break;
683                 }
684                 return;
685         } else if (nbytes == -1) {
686 /// TODO: FD is gone. kick it.        sock_buff_invoke_free(sb, errno);
687                 EV_syslog(LOG_DEBUG,
688                           "EVENT: Socket Invalid! %s \n",
689                           strerror(errno));
690                 ShutDownCLient(IO);
691                 return;
692         }
693 }
694
695 void
696 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
697 {
698         AsyncIO *IO = watcher->data;
699         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
700         become_session(IO->CitContext);
701         assert(IO->DNS.Fail);
702         assert(IO->DNS.Query->PostDNS);
703         switch (IO->DNS.Query->PostDNS(IO))
704         {
705         case eAbort:
706                 switch (IO->DNS.Fail(IO)) {
707                 case eAbort:
708                         ShutDownCLient(IO);
709                 default:
710                         break;
711                 }
712         default:
713                 break;
714         }
715 }
716
717
718 eNextState EvConnectSock(AsyncIO *IO,
719                          double conn_timeout,
720                          double first_rw_timeout,
721                          int ReadFirst)
722 {
723         int fdflags;
724         int rc = -1;
725
726         become_session(IO->CitContext);
727
728         if (ReadFirst) {
729                 IO->NextState = eReadMessage;
730         }
731         else {
732                 IO->NextState = eSendReply;
733         }
734
735         IO->SendBuf.fd = IO->RecvBuf.fd =
736                 socket(
737                         (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
738                         SOCK_STREAM,
739                         IPPROTO_TCP);
740
741         if (IO->SendBuf.fd < 0) {
742                 EV_syslog(LOG_ERR,
743                           "EVENT: socket() failed: %s\n",
744                           strerror(errno));
745
746                 StrBufPrintf(IO->ErrMsg,
747                              "Failed to create socket: %s",
748                              strerror(errno));
749                 return eAbort;
750         }
751         fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
752         if (fdflags < 0) {
753                 EV_syslog(LOG_DEBUG,
754                           "EVENT: unable to get socket flags! %s \n",
755                           strerror(errno));
756                 StrBufPrintf(IO->ErrMsg,
757                              "Failed to get socket flags: %s",
758                              strerror(errno));
759                 return eAbort;
760         }
761         fdflags = fdflags | O_NONBLOCK;
762         if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
763                 EV_syslog(
764                         LOG_DEBUG,
765                         "EVENT: unable to set socket nonblocking flags! %s \n",
766                         strerror(errno));
767                 StrBufPrintf(IO->ErrMsg,
768                              "Failed to set socket flags: %s",
769                              strerror(errno));
770                 close(IO->SendBuf.fd);
771                 IO->SendBuf.fd = IO->RecvBuf.fd = -1;
772                 return eAbort;
773         }
774 /* TODO: maye we could use offsetof() to calc the position of data...
775  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
776  */
777         ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
778         IO->recv_event.data = IO;
779         ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
780         IO->send_event.data = IO;
781
782         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
783         IO->conn_fail.data = IO;
784         ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
785         IO->rw_timeout.data = IO;
786
787
788         /* for debugging you may bypass it like this:
789          * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
790          * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
791          *   inet_addr("127.0.0.1");
792          */
793         if (IO->ConnectMe->IPv6)
794                 rc = connect(IO->SendBuf.fd,
795                              &IO->ConnectMe->Addr,
796                              sizeof(struct sockaddr_in6));
797         else
798                 rc = connect(IO->SendBuf.fd,
799                              (struct sockaddr_in *)&IO->ConnectMe->Addr,
800                              sizeof(struct sockaddr_in));
801
802         if (rc >= 0){
803                 EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
804                 set_start_callback(event_base, IO, 0);
805                 ev_timer_start(event_base, &IO->rw_timeout);
806                 return IO->NextState;
807         }
808         else if (errno == EINPROGRESS) {
809                 EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
810
811                 ev_io_init(&IO->conn_event,
812                            IO_connestd_callback,
813                            IO->SendBuf.fd,
814                            EV_READ|EV_WRITE);
815
816                 IO->conn_event.data = IO;
817
818                 ev_io_start(event_base, &IO->conn_event);
819                 ev_timer_start(event_base, &IO->conn_fail);
820                 return IO->NextState;
821         }
822         else {
823                 ev_idle_init(&IO->conn_fail_immediate,
824                              IO_connfailimmediate_callback);
825                 IO->conn_fail_immediate.data = IO;
826                 ev_idle_start(event_base, &IO->conn_fail_immediate);
827
828                 EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
829                 StrBufPrintf(IO->ErrMsg,
830                              "Failed to connect: %s",
831                              strerror(errno));
832                 return IO->NextState;
833         }
834         return IO->NextState;
835 }
836
837 void SetNextTimeout(AsyncIO *IO, double timeout)
838 {
839         IO->rw_timeout.repeat = timeout;
840         ev_timer_again (event_base,  &IO->rw_timeout);
841 }
842
843
844 eNextState ReAttachIO(AsyncIO *IO,
845                       void *pData,
846                       int ReadFirst)
847 {
848         IO->Data = pData;
849         become_session(IO->CitContext);
850         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
851         if (ReadFirst) {
852                 IO->NextState = eReadMessage;
853         }
854         else {
855                 IO->NextState = eSendReply;
856         }
857         set_start_callback(event_base, IO, 0);
858
859         return IO->NextState;
860 }
861
862 void InitIOStruct(AsyncIO *IO,
863                   void *Data,
864                   eNextState NextState,
865                   IO_LineReaderCallback LineReader,
866                   IO_CallBack DNS_Fail,
867                   IO_CallBack SendDone,
868                   IO_CallBack ReadDone,
869                   IO_CallBack Terminate,
870                   IO_CallBack ConnFail,
871                   IO_CallBack Timeout,
872                   IO_CallBack ShutdownAbort)
873 {
874         IO->Data          = Data;
875
876         IO->CitContext    = CloneContext(CC);
877         ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
878
879         IO->NextState     = NextState;
880
881         IO->SendDone      = SendDone;
882         IO->ReadDone      = ReadDone;
883         IO->Terminate     = Terminate;
884         IO->LineReader    = LineReader;
885         IO->ConnFail      = ConnFail;
886         IO->Timeout       = Timeout;
887         IO->ShutdownAbort = ShutdownAbort;
888
889         IO->DNS.Fail      = DNS_Fail;
890
891         IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
892         IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
893         IO->IOBuf         = NewStrBuf();
894
895 }
896
897 extern int evcurl_init(AsyncIO *IO);
898
899 int InitcURLIOStruct(AsyncIO *IO,
900                      void *Data,
901                      const char* Desc,
902                      IO_CallBack SendDone,
903                      IO_CallBack Terminate,
904                      IO_CallBack ShutdownAbort)
905 {
906         IO->Data          = Data;
907
908         IO->CitContext    = CloneContext(CC);
909         ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
910
911         IO->SendDone = SendDone;
912         IO->Terminate = Terminate;
913         IO->ShutdownAbort = ShutdownAbort;
914
915         strcpy(IO->HttpReq.errdesc, Desc);
916
917
918         return  evcurl_init(IO);
919
920 }