EV: fix possible nullpointer access in last commit.
[citadel.git] / citadel / event_client.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12
13 #include "sysdep.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <termios.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <pwd.h>
21 #include <errno.h>
22 #include <sys/types.h>
23 #include <syslog.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35 #include <sys/wait.h>
36 #include <ctype.h>
37 #include <string.h>
38 #include <limits.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <assert.h>
43 #if HAVE_BACKTRACE
44 #include <execinfo.h>
45 #endif
46
47 #include <libcitadel.h>
48 #include "citadel.h"
49 #include "server.h"
50 #include "citserver.h"
51 #include "support.h"
52 #include "config.h"
53 #include "control.h"
54 #include "user_ops.h"
55 #include "database.h"
56 #include "msgbase.h"
57 #include "internet_addressing.h"
58 #include "genstamp.h"
59 #include "domain.h"
60 #include "clientsocket.h"
61 #include "locate_host.h"
62 #include "citadel_dirs.h"
63
64 #include "event_client.h"
65 #include "ctdl_module.h"
66
67
68 ConstStr IOStates[] = {
69         {HKEY("DB Queue")},
70         {HKEY("DB Q Next")},
71         {HKEY("DB Attach")},
72         {HKEY("DB Next")},
73         {HKEY("DB Stop")},
74         {HKEY("DB Exit")},
75         {HKEY("DB Terminate")},
76         {HKEY("IO Queue")},
77         {HKEY("IO Attach")},
78         {HKEY("IO Connect Socket")},
79         {HKEY("IO Abort")},
80         {HKEY("IO Timeout")},
81         {HKEY("IO ConnFail")},
82         {HKEY("IO ConnFail Now")},
83         {HKEY("IO Conn Now")},
84         {HKEY("IO Conn Wait")},
85         {HKEY("Curl Q")},
86         {HKEY("Curl Start")},
87         {HKEY("Curl Shotdown")},
88         {HKEY("Curl More IO")},
89         {HKEY("Curl Got IO")},
90         {HKEY("Curl Got Data")},
91         {HKEY("Curl Got Status")},
92         {HKEY("C-Ares Start")},
93         {HKEY("C-Ares IO Done")},
94         {HKEY("C-Ares Finished")},
95         {HKEY("C-Ares exit")},
96         {HKEY("Killing")},
97         {HKEY("Exit")}
98 };
99
100 void SetEVState(AsyncIO *IO, eIOState State)
101 {
102
103         CitContext* CCC = IO->CitContext;
104         if (CCC != NULL)
105                 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
106
107 }
108
109
110 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
111 static void IO_abort_shutdown_callback(struct ev_loop *loop,
112                                        ev_cleanup *watcher,
113                                        int revents);
114
115
116 /*------------------------------------------------------------------------------
117  *                              Server DB IO
118  *----------------------------------------------------------------------------*/
119 extern int evdb_count;
120 extern pthread_mutex_t DBEventQueueMutex;
121 extern pthread_mutex_t DBEventExitQueueMutex;
122 extern HashList *DBInboundEventQueue;
123 extern struct ev_loop *event_db;
124 extern ev_async DBAddJob;
125 extern ev_async DBExitEventLoop;
126
127 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
128 {
129         IOAddHandler *h;
130         int i;
131
132         SetEVState(IO, eDBQ);
133         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
134         h->IO = IO;
135         h->EvAttch = CB;
136         ev_cleanup_init(&IO->db_abort_by_shutdown,
137                         IO_abort_shutdown_callback);
138         IO->db_abort_by_shutdown.data = IO;
139
140         pthread_mutex_lock(&DBEventQueueMutex);
141         if (DBInboundEventQueue == NULL)
142         {
143                 /* shutting down... */
144                 free(h);
145                 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
146                 pthread_mutex_unlock(&DBEventQueueMutex);
147                 return eAbort;
148         }
149         EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
150         i = ++evdb_count ;
151         Put(DBInboundEventQueue, IKEY(i), h, NULL);
152         pthread_mutex_unlock(&DBEventQueueMutex);
153
154         pthread_mutex_lock(&DBEventExitQueueMutex);
155         if (event_db == NULL)
156         {
157                 pthread_mutex_unlock(&DBEventExitQueueMutex);
158                 return eAbort;
159         }
160         ev_async_send (event_db, &DBAddJob);
161         pthread_mutex_unlock(&DBEventExitQueueMutex);
162
163         EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
164         return eDBQuery;
165 }
166
167 void StopDBWatchers(AsyncIO *IO)
168 {
169         SetEVState(IO, eDBStop);
170         ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
171         ev_idle_stop(event_db, &IO->db_unwind_stack);
172 }
173
174 void ShutDownDBCLient(AsyncIO *IO)
175 {
176         CitContext *Ctx =IO->CitContext;
177         become_session(Ctx);
178
179         SetEVState(IO, eDBTerm);
180         EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
181         StopDBWatchers(IO);
182
183         assert(IO->DBTerminate);
184         IO->DBTerminate(IO);
185 }
186
187 void
188 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
189 {
190         AsyncIO *IO = watcher->data;
191
192         SetEVState(IO, eDBNext);
193         IO->Now = ev_now(event_db);
194         EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
195         become_session(IO->CitContext);
196
197         ev_idle_stop(event_db, &IO->db_unwind_stack);
198
199         assert(IO->NextDBOperation);
200         switch (IO->NextDBOperation(IO))
201         {
202         case eDBQuery:
203                 break;
204         case eSendDNSQuery:
205         case eReadDNSReply:
206         case eConnect:
207         case eSendReply:
208         case eSendMore:
209         case eSendFile:
210         case eReadMessage:
211         case eReadMore:
212         case eReadPayload:
213         case eReadFile:
214                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
215                 break;
216         case eTerminateConnection:
217         case eAbort:
218                 ev_idle_stop(event_db, &IO->db_unwind_stack);
219                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
220                 ShutDownDBCLient(IO);
221         }
222 }
223
224 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
225 {
226         SetEVState(IO, eQDBNext);
227         IO->NextDBOperation = CB;
228         ev_idle_init(&IO->db_unwind_stack,
229                      DB_PerformNext);
230         IO->db_unwind_stack.data = IO;
231         ev_idle_start(event_db, &IO->db_unwind_stack);
232         return eDBQuery;
233 }
234
235 /*------------------------------------------------------------------------------
236  *                      Client IO
237  *----------------------------------------------------------------------------*/
238 extern int evbase_count;
239 extern pthread_mutex_t EventQueueMutex;
240 extern pthread_mutex_t EventExitQueueMutex; 
241 extern HashList *InboundEventQueue;
242 extern struct ev_loop *event_base;
243 extern ev_async AddJob;
244 extern ev_async ExitEventLoop;
245
246 static void IO_abort_shutdown_callback(struct ev_loop *loop,
247                                        ev_cleanup *watcher,
248                                        int revents)
249 {
250         AsyncIO *IO = watcher->data;
251
252         SetEVState(IO, eIOAbort);
253         EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
254         IO->Now = ev_now(event_base);
255         assert(IO->ShutdownAbort);
256         IO->ShutdownAbort(IO);
257 }
258
259
260 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
261 {
262         IOAddHandler *h;
263         int i;
264
265         SetEVState(IO, eIOQ);
266         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
267         h->IO = IO;
268         h->EvAttch = CB;
269         ev_cleanup_init(&IO->abort_by_shutdown,
270                         IO_abort_shutdown_callback);
271         IO->abort_by_shutdown.data = IO;
272
273         pthread_mutex_lock(&EventQueueMutex);
274         if (InboundEventQueue == NULL)
275         {
276                 free(h);
277                 /* shutting down... */
278                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
279                 pthread_mutex_unlock(&EventQueueMutex);
280                 return eAbort;
281         }
282         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
283         i = ++evbase_count;
284         Put(InboundEventQueue, IKEY(i), h, NULL);
285         pthread_mutex_unlock(&EventQueueMutex);
286
287         pthread_mutex_lock(&EventExitQueueMutex);
288         if (event_base == NULL) {
289                 pthread_mutex_unlock(&EventExitQueueMutex);
290                 return eAbort;
291         }
292         ev_async_send (event_base, &AddJob);
293         pthread_mutex_unlock(&EventExitQueueMutex);
294         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
295         return eSendReply;
296 }
297
298 extern eNextState evcurl_handle_start(AsyncIO *IO);
299
300 eNextState QueueCurlContext(AsyncIO *IO)
301 {
302         IOAddHandler *h;
303         int i;
304
305         SetEVState(IO, eCurlQ);
306         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
307         h->IO = IO;
308         h->EvAttch = evcurl_handle_start;
309
310         pthread_mutex_lock(&EventQueueMutex);
311         if (InboundEventQueue == NULL)
312         {
313                 /* shutting down... */
314                 free(h);
315                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
316                 pthread_mutex_unlock(&EventQueueMutex);
317                 return eAbort;
318         }
319
320         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
321         i = ++evbase_count;
322         Put(InboundEventQueue, IKEY(i), h, NULL);
323         pthread_mutex_unlock(&EventQueueMutex);
324
325         pthread_mutex_lock(&EventExitQueueMutex);
326         if (event_base == NULL) {
327                 pthread_mutex_unlock(&EventExitQueueMutex);
328                 return eAbort;
329         }
330         ev_async_send (event_base, &AddJob);
331         pthread_mutex_unlock(&EventExitQueueMutex);
332
333         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
334         return eSendReply;
335 }
336
337 void DestructCAres(AsyncIO *IO);
338 void FreeAsyncIOContents(AsyncIO *IO)
339 {
340         CitContext *Ctx = IO->CitContext;
341
342         FreeStrBuf(&IO->IOBuf);
343         FreeStrBuf(&IO->SendBuf.Buf);
344         FreeStrBuf(&IO->RecvBuf.Buf);
345
346         DestructCAres(IO);
347
348         FreeURL(&IO->ConnectMe);
349         FreeStrBuf(&IO->HttpReq.ReplyData);
350
351         if (Ctx) {
352                 Ctx->state = CON_IDLE;
353                 Ctx->kill_me = 1;
354                 IO->CitContext = NULL;
355         }
356 }
357
358
359 void StopClientWatchers(AsyncIO *IO, int CloseFD)
360 {
361         ev_timer_stop (event_base, &IO->rw_timeout);
362         ev_timer_stop(event_base, &IO->conn_fail);
363         ev_idle_stop(event_base, &IO->unwind_stack);
364         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
365
366         ev_io_stop(event_base, &IO->conn_event);
367         ev_io_stop(event_base, &IO->send_event);
368         ev_io_stop(event_base, &IO->recv_event);
369
370         if (CloseFD && (IO->SendBuf.fd > 0)) {
371                 close(IO->SendBuf.fd);
372                 IO->SendBuf.fd = 0;
373                 IO->RecvBuf.fd = 0;
374         }
375 }
376
377 void StopCurlWatchers(AsyncIO *IO)
378 {
379         ev_timer_stop (event_base, &IO->rw_timeout);
380         ev_timer_stop(event_base, &IO->conn_fail);
381         ev_idle_stop(event_base, &IO->unwind_stack);
382         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
383
384         ev_io_stop(event_base, &IO->conn_event);
385         ev_io_stop(event_base, &IO->send_event);
386         ev_io_stop(event_base, &IO->recv_event);
387
388         if (IO->SendBuf.fd != 0) {
389                 close(IO->SendBuf.fd);
390         }
391         IO->SendBuf.fd = 0;
392         IO->RecvBuf.fd = 0;
393 }
394
395 void ShutDownCLient(AsyncIO *IO)
396 {
397         CitContext *Ctx =IO->CitContext;
398
399         SetEVState(IO, eExit);
400         become_session(Ctx);
401
402         EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
403
404         StopClientWatchers(IO, 1);
405
406         if (IO->DNS.Channel != NULL) {
407                 ares_destroy(IO->DNS.Channel);
408                 EV_DNS_LOG_STOP(DNS.recv_event);
409                 EV_DNS_LOG_STOP(DNS.send_event);
410                 ev_io_stop(event_base, &IO->DNS.recv_event);
411                 ev_io_stop(event_base, &IO->DNS.send_event);
412                 IO->DNS.Channel = NULL;
413         }
414         assert(IO->Terminate);
415         IO->Terminate(IO);
416 }
417
418 void PostInbound(AsyncIO *IO)
419 {
420         switch (IO->NextState) {
421         case eSendFile:
422                 ev_io_start(event_base, &IO->send_event);
423                 break;
424         case eSendReply:
425         case eSendMore:
426                 assert(IO->SendDone);
427                 IO->NextState = IO->SendDone(IO);
428                 switch (IO->NextState)
429                 {
430                 case eSendFile:
431                 case eSendReply:
432                 case eSendMore:
433                 case eReadMessage:
434                 case eReadPayload:
435                 case eReadMore:
436                 case eReadFile:
437                         ev_io_start(event_base, &IO->send_event);
438                         break;
439                 case eDBQuery:
440                         StopClientWatchers(IO, 0);
441                 default:
442                         break;
443                 }
444                 break;
445         case eReadPayload:
446         case eReadMore:
447         case eReadFile:
448                 ev_io_start(event_base, &IO->recv_event);
449                 break;
450         case eTerminateConnection:
451                 ShutDownCLient(IO);
452                 break;
453         case eAbort:
454                 ShutDownCLient(IO);
455                 break;
456         case eSendDNSQuery:
457         case eReadDNSReply:
458         case eDBQuery:
459         case eConnect:
460         case eReadMessage:
461                 break;
462         }
463 }
464 eReadState HandleInbound(AsyncIO *IO)
465 {
466         const char *Err = NULL;
467         eReadState Finished = eBufferNotEmpty;
468
469         become_session(IO->CitContext);
470
471         while ((Finished == eBufferNotEmpty) &&
472                ((IO->NextState == eReadMessage)||
473                 (IO->NextState == eReadMore)||
474                 (IO->NextState == eReadFile)||
475                 (IO->NextState == eReadPayload)))
476         {
477                 /* Reading lines...
478                  * lex line reply in callback,
479                  * or do it ourselves.
480                  * i.e. as nnn-blabla means continue reading in SMTP
481                  */
482                 if ((IO->NextState == eReadFile) &&
483                     (Finished == eBufferNotEmpty))
484                 {
485                         Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
486                         if (Finished == eReadSuccess)
487                         {
488                                 IO->NextState = eSendReply;
489                         }
490                 }
491                 else if (IO->LineReader)
492                         Finished = IO->LineReader(IO);
493                 else
494                         Finished = StrBufChunkSipLine(IO->IOBuf,
495                                                       &IO->RecvBuf);
496
497                 switch (Finished) {
498                 case eMustReadMore: /// read new from socket...
499                         break;
500                 case eBufferNotEmpty: /* shouldn't happen... */
501                 case eReadSuccess: /// done for now...
502                         break;
503                 case eReadFail: /// WHUT?
504                                 ///todo: shut down!
505                         break;
506                 }
507
508                 if (Finished != eMustReadMore) {
509                         assert(IO->ReadDone);
510                         ev_io_stop(event_base, &IO->recv_event);
511                         IO->NextState = IO->ReadDone(IO);
512                         Finished = StrBufCheckBuffer(&IO->RecvBuf);
513                 }
514         }
515
516         PostInbound(IO);
517
518         return Finished;
519 }
520
521
522 static void
523 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
524 {
525         int rc;
526         AsyncIO *IO = watcher->data;
527         const char *errmsg = NULL;
528
529         IO->Now = ev_now(event_base);
530         become_session(IO->CitContext);
531 #ifdef BIGBAD_IODBG
532         {
533                 int rv = 0;
534                 char fn [SIZ];
535                 FILE *fd;
536                 const char *pch = ChrPtr(IO->SendBuf.Buf);
537                 const char *pchh = IO->SendBuf.ReadWritePointer;
538                 long nbytes;
539
540                 if (pchh == NULL)
541                         pchh = pch;
542
543                 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
544                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
545                          ((CitContext*)(IO->CitContext))->ServiceName,
546                          IO->SendBuf.fd);
547
548                 fd = fopen(fn, "a+");
549                 fprintf(fd, "Send: BufSize: %ld BufContent: [",
550                         nbytes);
551                 rv = fwrite(pchh, nbytes, 1, fd);
552                 if (!rv) printf("failed to write debug to %s!\n", fn);
553                 fprintf(fd, "]\n");
554 #endif
555                 switch (IO->NextState) {
556                 case eSendFile:
557                         rc = FileSendChunked(&IO->IOB, &errmsg);
558                         if (rc < 0)
559                                 StrBufPlain(IO->ErrMsg, errmsg, -1);
560                         break;
561                 default:
562                         rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
563                                                              0,
564                                                              &IO->SendBuf);
565                 }
566
567 #ifdef BIGBAD_IODBG
568                 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
569                 fclose(fd);
570         }
571 #endif
572         if (rc == 0)
573         {
574                 ev_io_stop(event_base, &IO->send_event);
575                 switch (IO->NextState) {
576                 case eSendMore:
577                         assert(IO->SendDone);
578                         IO->NextState = IO->SendDone(IO);
579
580                         if ((IO->NextState == eTerminateConnection) ||
581                             (IO->NextState == eAbort) )
582                                 ShutDownCLient(IO);
583                         else {
584                                 ev_io_start(event_base, &IO->send_event);
585                         }
586                         break;
587                 case eSendFile:
588                         if (IO->IOB.ChunkSendRemain > 0) {
589                                 ev_io_start(event_base, &IO->recv_event);
590                                 SetNextTimeout(IO, 100.0);
591
592                         } else {
593                                 assert(IO->ReadDone);
594                                 IO->NextState = IO->ReadDone(IO);
595                                 switch(IO->NextState) {
596                                 case eSendDNSQuery:
597                                 case eReadDNSReply:
598                                 case eDBQuery:
599                                 case eConnect:
600                                         break;
601                                 case eSendReply:
602                                 case eSendMore:
603                                 case eSendFile:
604                                         ev_io_start(event_base,
605                                                     &IO->send_event);
606                                         break;
607                                 case eReadMessage:
608                                 case eReadMore:
609                                 case eReadPayload:
610                                 case eReadFile:
611                                         break;
612                                 case eTerminateConnection:
613                                 case eAbort:
614                                         break;
615                                 }
616                         }
617                         break;
618                 case eSendReply:
619                     if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
620                         break;
621                     IO->NextState = eReadMore;
622                 case eReadMore:
623                 case eReadMessage:
624                 case eReadPayload:
625                 case eReadFile:
626                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
627                         {
628                                 HandleInbound(IO);
629                         }
630                         else {
631                                 ev_io_start(event_base, &IO->recv_event);
632                         }
633
634                         break;
635                 case eDBQuery:
636                         /*
637                          * we now live in another queue,
638                          * so we have to unregister.
639                          */
640                         ev_cleanup_stop(loop, &IO->abort_by_shutdown);
641                         break;
642                 case eSendDNSQuery:
643                 case eReadDNSReply:
644                 case eConnect:
645                 case eTerminateConnection:
646                 case eAbort:
647                         break;
648                 }
649         }
650         else if (rc < 0) {
651                 if (errno != EAGAIN) {
652                         StopClientWatchers(IO, 1);
653                         EV_syslog(LOG_DEBUG,
654                                   "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
655                                   errno, strerror(errno), IO->SendBuf.fd);
656                         StrBufPrintf(IO->ErrMsg,
657                                      "Socket Invalid! [%s]",
658                                      strerror(errno));
659                         SetNextTimeout(IO, 0.01);
660                 }
661         }
662         /* else : must write more. */
663 }
664 static void
665 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
666 {
667         ev_timer_stop(event_base, &IO->conn_fail);
668         ev_timer_start(event_base, &IO->rw_timeout);
669
670         switch(IO->NextState) {
671         case eReadMore:
672         case eReadMessage:
673         case eReadFile:
674                 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
675                 ev_io_start(event_base, &IO->recv_event);
676                 break;
677         case eSendReply:
678         case eSendMore:
679         case eReadPayload:
680         case eSendFile:
681                 become_session(IO->CitContext);
682                 IO_send_callback(loop, &IO->send_event, revents);
683                 break;
684         case eDBQuery:
685         case eSendDNSQuery:
686         case eReadDNSReply:
687         case eConnect:
688         case eTerminateConnection:
689         case eAbort:
690                 /// TODO: WHUT?
691                 break;
692         }
693 }
694
695 static void
696 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
697 {
698         AsyncIO *IO = watcher->data;
699
700         SetEVState(IO, eIOTimeout);
701         IO->Now = ev_now(event_base);
702         ev_timer_stop (event_base, &IO->rw_timeout);
703         become_session(IO->CitContext);
704
705         if (IO->SendBuf.fd != 0)
706         {
707                 ev_io_stop(event_base, &IO->send_event);
708                 ev_io_stop(event_base, &IO->recv_event);
709                 ev_timer_stop (event_base, &IO->rw_timeout);
710                 close(IO->SendBuf.fd);
711                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
712         }
713
714         assert(IO->Timeout);
715         switch (IO->Timeout(IO))
716         {
717         case eAbort:
718                 ShutDownCLient(IO);
719         default:
720                 break;
721         }
722 }
723
724 static void
725 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
726 {
727         AsyncIO *IO = watcher->data;
728
729         SetEVState(IO, eIOConnfail);
730         IO->Now = ev_now(event_base);
731         ev_timer_stop (event_base, &IO->conn_fail);
732
733         if (IO->SendBuf.fd != 0)
734         {
735                 ev_io_stop(loop, &IO->conn_event);
736                 ev_io_stop(event_base, &IO->send_event);
737                 ev_io_stop(event_base, &IO->recv_event);
738                 ev_timer_stop (event_base, &IO->rw_timeout);
739                 close(IO->SendBuf.fd);
740                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
741         }
742         become_session(IO->CitContext);
743
744         assert(IO->ConnFail);
745         switch (IO->ConnFail(IO))
746         {
747         case eAbort:
748                 ShutDownCLient(IO);
749         default:
750                 break;
751
752         }
753 }
754
755 static void
756 IO_connfailimmediate_callback(struct ev_loop *loop,
757                               ev_idle *watcher,
758                               int revents)
759 {
760         AsyncIO *IO = watcher->data;
761
762         SetEVState(IO, eIOConnfailNow);
763         IO->Now = ev_now(event_base);
764         ev_idle_stop (event_base, &IO->conn_fail_immediate);
765
766         if (IO->SendBuf.fd != 0)
767         {
768                 close(IO->SendBuf.fd);
769                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
770         }
771         become_session(IO->CitContext);
772
773         assert(IO->ConnFail);
774         switch (IO->ConnFail(IO))
775         {
776         case eAbort:
777                 ShutDownCLient(IO);
778         default:
779                 break;
780
781         }
782 }
783
784 static void
785 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
786 {
787         AsyncIO *IO = watcher->data;
788         int             so_err = 0;
789         socklen_t       lon = sizeof(so_err);
790         int             err;
791
792         SetEVState(IO, eIOConnNow);
793         IO->Now = ev_now(event_base);
794         EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
795
796         ev_io_stop(loop, &IO->conn_event);
797         ev_timer_stop(event_base, &IO->conn_fail);
798
799         err = getsockopt(IO->SendBuf.fd,
800                          SOL_SOCKET,
801                          SO_ERROR,
802                          (void*)&so_err,
803                          &lon);
804
805         if ((err == 0) && (so_err != 0))
806         {
807                 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
808                           so_err,
809                           strerror(so_err));
810                 IO_connfail_callback(loop, &IO->conn_fail, revents);
811
812         }
813         else
814         {
815                 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
816                 set_start_callback(loop, IO, revents);
817         }
818 }
819
820 static void
821 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
822 {
823         const char *errmsg;
824         ssize_t nbytes;
825         AsyncIO *IO = watcher->data;
826
827         IO->Now = ev_now(event_base);
828         switch (IO->NextState) {
829         case eReadFile:
830                 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
831                 if (nbytes < 0)
832                         StrBufPlain(IO->ErrMsg, errmsg, -1);
833                 else
834                 {
835                         if (IO->IOB.ChunkSendRemain == 0)
836                         {
837                                 IO->NextState = eSendReply;
838                                 assert(IO->ReadDone);
839                                 ev_io_stop(event_base, &IO->recv_event);
840                                 PostInbound(IO);
841                                 return;
842                         }
843                         else
844                                 return;
845                 }
846                 break;
847         default:
848                 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
849                                                         0,
850                                                         &IO->RecvBuf);
851                 break;
852         }
853
854 #ifdef BIGBAD_IODBG
855         {
856                 long nbytes;
857                 int rv = 0;
858                 char fn [SIZ];
859                 FILE *fd;
860                 const char *pch = ChrPtr(IO->RecvBuf.Buf);
861                 const char *pchh = IO->RecvBuf.ReadWritePointer;
862
863                 if (pchh == NULL)
864                         pchh = pch;
865
866                 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
867                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
868                          ((CitContext*)(IO->CitContext))->ServiceName,
869                          IO->SendBuf.fd);
870
871                 fd = fopen(fn, "a+");
872                 fprintf(fd, "Read: BufSize: %ld BufContent: [",
873                         nbytes);
874                 rv = fwrite(pchh, nbytes, 1, fd);
875                 if (!rv) printf("failed to write debug to %s!\n", fn);
876                 fprintf(fd, "]\n");
877                 fclose(fd);
878         }
879 #endif
880         if (nbytes > 0) {
881                 HandleInbound(IO);
882         } else if (nbytes == 0) {
883                 StopClientWatchers(IO, 1);
884                 SetNextTimeout(IO, 0.01);
885                 return;
886         } else if (nbytes == -1) {
887                 if (errno != EAGAIN) {
888                         // FD is gone. kick it. 
889                         StopClientWatchers(IO, 1);
890                         EV_syslog(LOG_DEBUG,
891                                   "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
892                                   errno, strerror(errno), IO->SendBuf.fd);
893                         StrBufPrintf(IO->ErrMsg,
894                                      "Socket Invalid! [%s]",
895                                      strerror(errno));
896                         SetNextTimeout(IO, 0.01);
897                 }
898                 return;
899         }
900 }
901
902 void
903 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
904 {
905         AsyncIO *IO = watcher->data;
906
907         SetEVState(IO, eCaresFinished);
908         IO->Now = ev_now(event_base);
909         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
910         become_session(IO->CitContext);
911         assert(IO->DNS.Query->PostDNS);
912         switch (IO->DNS.Query->PostDNS(IO))
913         {
914         case eAbort:
915                 assert(IO->DNS.Fail);
916                 switch (IO->DNS.Fail(IO)) {
917                 case eAbort:
918 ////                    StopClientWatchers(IO);
919                         ShutDownCLient(IO);
920                 default:
921                         break;
922                 }
923         default:
924                 break;
925         }
926 }
927
928
929 eNextState EvConnectSock(AsyncIO *IO,
930                          double conn_timeout,
931                          double first_rw_timeout,
932                          int ReadFirst)
933 {
934         struct sockaddr_in egress_sin;
935         int fdflags;
936         int rc = -1;
937
938         SetEVState(IO, eIOConnectSock);
939         become_session(IO->CitContext);
940
941         if (ReadFirst) {
942                 IO->NextState = eReadMessage;
943         }
944         else {
945                 IO->NextState = eSendReply;
946         }
947
948         IO->SendBuf.fd = IO->RecvBuf.fd =
949                 socket(
950                         (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
951                         SOCK_STREAM,
952                         IPPROTO_TCP);
953
954         if (IO->SendBuf.fd < 0) {
955                 EV_syslog(LOG_ERR,
956                           "EVENT: socket() failed: %s\n",
957                           strerror(errno));
958
959                 StrBufPrintf(IO->ErrMsg,
960                              "Failed to create socket: %s",
961                              strerror(errno));
962                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
963                 return eAbort;
964         }
965         fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
966         if (fdflags < 0) {
967                 EV_syslog(LOG_ERR,
968                           "EVENT: unable to get socket %d flags! %s \n",
969                           IO->SendBuf.fd,
970                           strerror(errno));
971                 StrBufPrintf(IO->ErrMsg,
972                              "Failed to get socket %d flags: %s",
973                              IO->SendBuf.fd,
974                              strerror(errno));
975                 close(IO->SendBuf.fd);
976                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
977                 return eAbort;
978         }
979         fdflags = fdflags | O_NONBLOCK;
980         if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
981                 EV_syslog(
982                         LOG_ERR,
983                         "EVENT: unable to set socket %d nonblocking flags! %s \n",
984                         IO->SendBuf.fd,
985                         strerror(errno));
986                 StrBufPrintf(IO->ErrMsg,
987                              "Failed to set socket flags: %s",
988                              strerror(errno));
989                 close(IO->SendBuf.fd);
990                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
991                 return eAbort;
992         }
993 /* TODO: maye we could use offsetof() to calc the position of data...
994  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
995  */
996         ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
997         IO->recv_event.data = IO;
998         ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
999         IO->send_event.data = IO;
1000
1001         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1002         IO->conn_fail.data = IO;
1003         ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1004         IO->rw_timeout.data = IO;
1005
1006
1007
1008
1009         /* for debugging you may bypass it like this:
1010          * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1011          * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1012          *   inet_addr("127.0.0.1");
1013          */
1014         if (IO->ConnectMe->IPv6) {
1015                 rc = connect(IO->SendBuf.fd,
1016                              &IO->ConnectMe->Addr,
1017                              sizeof(struct sockaddr_in6));
1018         }
1019         else {
1020                 /* If citserver is bound to a specific IP address on the host, make
1021                  * sure we use that address for outbound connections.
1022                  */
1023         
1024                 memset(&egress_sin, 0, sizeof(egress_sin));
1025                 egress_sin.sin_family = AF_INET;
1026                 if (!IsEmptyStr(config.c_ip_addr)) {
1027                         egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1028                         if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1029                                 egress_sin.sin_addr.s_addr = INADDR_ANY;
1030                         }
1031
1032                         /* If this bind fails, no problem; we can still use INADDR_ANY */
1033                         bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1034                 }
1035                 rc = connect(IO->SendBuf.fd,
1036                              (struct sockaddr_in *)&IO->ConnectMe->Addr,
1037                              sizeof(struct sockaddr_in));
1038         }
1039
1040         if (rc >= 0){
1041                 SetEVState(IO, eIOConnNow);
1042                 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1043                 set_start_callback(event_base, IO, 0);
1044                 return IO->NextState;
1045         }
1046         else if (errno == EINPROGRESS) {
1047                 SetEVState(IO, eIOConnWait);
1048                 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1049
1050                 ev_io_init(&IO->conn_event,
1051                            IO_connestd_callback,
1052                            IO->SendBuf.fd,
1053                            EV_READ|EV_WRITE);
1054
1055                 IO->conn_event.data = IO;
1056
1057                 ev_io_start(event_base, &IO->conn_event);
1058                 ev_timer_start(event_base, &IO->conn_fail);
1059                 return IO->NextState;
1060         }
1061         else {
1062                 SetEVState(IO, eIOConnfail);
1063                 ev_idle_init(&IO->conn_fail_immediate,
1064                              IO_connfailimmediate_callback);
1065                 IO->conn_fail_immediate.data = IO;
1066                 ev_idle_start(event_base, &IO->conn_fail_immediate);
1067
1068                 EV_syslog(LOG_ERR,
1069                           "connect() = %d failed: %s\n",
1070                           IO->SendBuf.fd,
1071                           strerror(errno));
1072
1073                 StrBufPrintf(IO->ErrMsg,
1074                              "Failed to connect: %s",
1075                              strerror(errno));
1076                 return IO->NextState;
1077         }
1078         return IO->NextState;
1079 }
1080
1081 void SetNextTimeout(AsyncIO *IO, double timeout)
1082 {
1083         IO->rw_timeout.repeat = timeout;
1084         ev_timer_again (event_base,  &IO->rw_timeout);
1085 }
1086
1087
1088 eNextState ReAttachIO(AsyncIO *IO,
1089                       void *pData,
1090                       int ReadFirst)
1091 {
1092         SetEVState(IO, eIOAttach);
1093         IO->Data = pData;
1094         become_session(IO->CitContext);
1095         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1096         if (ReadFirst) {
1097                 IO->NextState = eReadMessage;
1098         }
1099         else {
1100                 IO->NextState = eSendReply;
1101         }
1102         set_start_callback(event_base, IO, 0);
1103
1104         return IO->NextState;
1105 }
1106
1107 void InitIOStruct(AsyncIO *IO,
1108                   void *Data,
1109                   eNextState NextState,
1110                   IO_LineReaderCallback LineReader,
1111                   IO_CallBack DNS_Fail,
1112                   IO_CallBack SendDone,
1113                   IO_CallBack ReadDone,
1114                   IO_CallBack Terminate,
1115                   IO_CallBack DBTerminate,
1116                   IO_CallBack ConnFail,
1117                   IO_CallBack Timeout,
1118                   IO_CallBack ShutdownAbort)
1119 {
1120         IO->Data          = Data;
1121
1122         IO->CitContext    = CloneContext(CC);
1123         IO->CitContext->session_specific_data = Data;
1124         IO->CitContext->IO = IO;
1125
1126         IO->NextState     = NextState;
1127
1128         IO->SendDone      = SendDone;
1129         IO->ReadDone      = ReadDone;
1130         IO->Terminate     = Terminate;
1131         IO->DBTerminate   = DBTerminate;
1132         IO->LineReader    = LineReader;
1133         IO->ConnFail      = ConnFail;
1134         IO->Timeout       = Timeout;
1135         IO->ShutdownAbort = ShutdownAbort;
1136
1137         IO->DNS.Fail      = DNS_Fail;
1138
1139         IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
1140         IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
1141         IO->IOBuf         = NewStrBuf();
1142         EV_syslog(LOG_DEBUG,
1143                   "EVENT: Session lives at %p IO at %p \n",
1144                   Data, IO);
1145
1146 }
1147
1148 extern int evcurl_init(AsyncIO *IO);
1149
1150 int InitcURLIOStruct(AsyncIO *IO,
1151                      void *Data,
1152                      const char* Desc,
1153                      IO_CallBack SendDone,
1154                      IO_CallBack Terminate,
1155                      IO_CallBack DBTerminate,
1156                      IO_CallBack ShutdownAbort)
1157 {
1158         IO->Data          = Data;
1159
1160         IO->CitContext    = CloneContext(CC);
1161         IO->CitContext->session_specific_data = Data;
1162         IO->CitContext->IO = IO;
1163
1164         IO->SendDone      = SendDone;
1165         IO->Terminate     = Terminate;
1166         IO->DBTerminate   = DBTerminate;
1167         IO->ShutdownAbort = ShutdownAbort;
1168
1169         strcpy(IO->HttpReq.errdesc, Desc);
1170
1171
1172         return  evcurl_init(IO);
1173
1174 }
1175
1176
1177 typedef struct KillOtherSessionContext {
1178         AsyncIO IO;
1179         AsyncIO *OtherOne;
1180 }KillOtherSessionContext;
1181
1182 eNextState KillTerminate(AsyncIO *IO)
1183 {
1184         long id;
1185         KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1186         EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1187         id = IO->ID;
1188         FreeAsyncIOContents(IO);
1189         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1190         IO->ID = id; /* just for the case we want to analyze it in a coredump */
1191         free(Ctx);
1192         return eAbort;
1193
1194 }
1195
1196 eNextState KillShutdown(AsyncIO *IO)
1197 {
1198         return eTerminateConnection;
1199 }
1200
1201 eNextState KillOtherContextNow(AsyncIO *IO)
1202 {
1203         KillOtherSessionContext *Ctx = IO->Data;
1204
1205         SetEVState(IO, eKill);
1206
1207         if (Ctx->OtherOne->ShutdownAbort != NULL)
1208                 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1209         return eTerminateConnection;
1210 }
1211
1212 void KillAsyncIOContext(AsyncIO *IO)
1213 {
1214         KillOtherSessionContext *Ctx;
1215
1216         Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1217         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1218         
1219         InitIOStruct(&Ctx->IO,
1220                      Ctx,
1221                      eReadMessage,
1222                      NULL,
1223                      NULL,
1224                      NULL,
1225                      NULL,
1226                      KillTerminate,
1227                      NULL,
1228                      NULL,
1229                      NULL,
1230                      KillShutdown);
1231
1232         Ctx->OtherOne = IO;
1233
1234         switch(IO->NextState) {
1235         case eSendDNSQuery:
1236         case eReadDNSReply:
1237
1238         case eConnect:
1239         case eSendReply:
1240         case eSendMore:
1241         case eSendFile:
1242
1243         case eReadMessage:
1244         case eReadMore:
1245         case eReadPayload:
1246         case eReadFile:
1247                 QueueEventContext(&Ctx->IO, KillOtherContextNow);
1248                 break;
1249         case eDBQuery:
1250                 QueueDBOperation(&Ctx->IO, KillOtherContextNow);
1251                 break;
1252         case eTerminateConnection:
1253         case eAbort:
1254                 /*hm, its already dying, dunno which Queue its in... */
1255                 free(Ctx);
1256         }
1257         
1258 }
1259
1260 extern int DebugEventLoopBacktrace;
1261 void EV_backtrace(AsyncIO *IO)
1262 {
1263 #ifdef HAVE_BACKTRACE
1264         void *stack_frames[50];
1265         size_t size, i;
1266         char **strings;
1267
1268         if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1269                 return;
1270         size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1271         strings = backtrace_symbols(stack_frames, size);
1272         for (i = 0; i < size; i++) {
1273                 if (strings != NULL) {
1274                         EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1275                 }
1276                 else {
1277                         EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1278                 }
1279         }
1280         free(strings);
1281 #endif
1282 }
1283
1284
1285 ev_tstamp ctdl_ev_now (void)
1286 {
1287         return ev_now(event_base);
1288 }