fad9eefeccaa5dea30651ba65cc482a1baa19d65
[citadel.git] / citadel / event_client.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12 #include "sysdep.h"
13
14 #include <stdio.h>
15 #include <string.h>
16 #include <syslog.h>
17 #include <assert.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #if HAVE_BACKTRACE
22 #include <execinfo.h>
23 #endif
24
25 #include <libcitadel.h>
26
27 #include "ctdl_module.h"
28 #include "event_client.h"
29 #include "citserver.h"
30 #include "config.h"
31
32 ConstStr IOStates[] = {
33         {HKEY("DB Queue")},
34         {HKEY("DB Q Next")},
35         {HKEY("DB Attach")},
36         {HKEY("DB Next")},
37         {HKEY("DB Stop")},
38         {HKEY("DB Exit")},
39         {HKEY("DB Terminate")},
40         {HKEY("IO Queue")},
41         {HKEY("IO Attach")},
42         {HKEY("IO Connect Socket")},
43         {HKEY("IO Abort")},
44         {HKEY("IO Timeout")},
45         {HKEY("IO ConnFail")},
46         {HKEY("IO ConnFail Now")},
47         {HKEY("IO Conn Now")},
48         {HKEY("IO Conn Wait")},
49         {HKEY("Curl Q")},
50         {HKEY("Curl Start")},
51         {HKEY("Curl Shotdown")},
52         {HKEY("Curl More IO")},
53         {HKEY("Curl Got IO")},
54         {HKEY("Curl Got Data")},
55         {HKEY("Curl Got Status")},
56         {HKEY("C-Ares Start")},
57         {HKEY("C-Ares IO Done")},
58         {HKEY("C-Ares Finished")},
59         {HKEY("C-Ares exit")},
60         {HKEY("Killing")},
61         {HKEY("Exit")}
62 };
63
64 void SetEVState(AsyncIO *IO, eIOState State)
65 {
66
67         CitContext* CCC = IO->CitContext;
68         if (CCC != NULL)
69                 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
70
71 }
72
73 eNextState QueueAnEventContext(AsyncIO *IO);
74 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
75 static void IO_abort_shutdown_callback(struct ev_loop *loop,
76                                        ev_cleanup *watcher,
77                                        int revents);
78
79
80 /*------------------------------------------------------------------------------
81  *                              Server DB IO
82  *----------------------------------------------------------------------------*/
83 extern int evdb_count;
84 extern pthread_mutex_t DBEventQueueMutex;
85 extern pthread_mutex_t DBEventExitQueueMutex;
86 extern HashList *DBInboundEventQueue;
87 extern struct ev_loop *event_db;
88 extern ev_async DBAddJob;
89 extern ev_async DBExitEventLoop;
90
91 eNextState QueueAnDBOperation(AsyncIO *IO)
92 {
93         IOAddHandler *h;
94         int i;
95
96         SetEVState(IO, eDBQ);
97         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
98         h->IO = IO;
99
100         assert(IO->ReAttachCB != NULL);
101
102         h->EvAttch = IO->ReAttachCB;
103         ev_cleanup_init(&IO->db_abort_by_shutdown,
104                         IO_abort_shutdown_callback);
105         IO->db_abort_by_shutdown.data = IO;
106
107         pthread_mutex_lock(&DBEventQueueMutex);
108         if (DBInboundEventQueue == NULL)
109         {
110                 /* shutting down... */
111                 free(h);
112                 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
113                 pthread_mutex_unlock(&DBEventQueueMutex);
114                 return eAbort;
115         }
116         EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
117         i = ++evdb_count ;
118         Put(DBInboundEventQueue, IKEY(i), h, NULL);
119         pthread_mutex_unlock(&DBEventQueueMutex);
120
121         pthread_mutex_lock(&DBEventExitQueueMutex);
122         if (event_db == NULL)
123         {
124                 pthread_mutex_unlock(&DBEventExitQueueMutex);
125                 return eAbort;
126         }
127         ev_async_send (event_db, &DBAddJob);
128         pthread_mutex_unlock(&DBEventExitQueueMutex);
129
130         EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
131         return eDBQuery;
132 }
133
134 void StopDBWatchers(AsyncIO *IO)
135 {
136         SetEVState(IO, eDBStop);
137         ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
138         ev_idle_stop(event_db, &IO->db_unwind_stack);
139 }
140
141 void ShutDownDBCLient(AsyncIO *IO)
142 {
143         CitContext *Ctx =IO->CitContext;
144         become_session(Ctx);
145
146         SetEVState(IO, eDBTerm);
147         EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
148         StopDBWatchers(IO);
149
150         assert(IO->DBTerminate);
151         IO->DBTerminate(IO);
152 }
153
154 void
155 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
156 {
157         AsyncIO *IO = watcher->data;
158
159         SetEVState(IO, eDBNext);
160         IO->CitContext->lastcmd = IO->Now = ev_now(event_db);
161         EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
162         become_session(IO->CitContext);
163
164         ev_idle_stop(event_db, &IO->db_unwind_stack);
165
166         assert(IO->NextDBOperation);
167         switch (IO->NextDBOperation(IO))
168         {
169         case eSendReply:
170                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
171                 QueueAnEventContext(IO);
172                 break;
173         case eDBQuery:
174                 break;
175         case eSendDNSQuery:
176         case eReadDNSReply:
177         case eConnect:
178         case eSendMore:
179         case eSendFile:
180         case eReadMessage:
181         case eReadMore:
182         case eReadPayload:
183         case eReadFile:
184                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
185                 break;
186         case eTerminateConnection:
187         case eAbort:
188                 ev_idle_stop(event_db, &IO->db_unwind_stack);
189                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
190                 ShutDownDBCLient(IO);
191         }
192 }
193
194 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
195 {
196         SetEVState(IO, eQDBNext);
197         IO->NextDBOperation = CB;
198         ev_idle_init(&IO->db_unwind_stack,
199                      DB_PerformNext);
200         IO->db_unwind_stack.data = IO;
201         ev_idle_start(event_db, &IO->db_unwind_stack);
202         return eDBQuery;
203 }
204
205 /*------------------------------------------------------------------------------
206  *                      Client IO
207  *----------------------------------------------------------------------------*/
208 extern int evbase_count;
209 extern pthread_mutex_t EventQueueMutex;
210 extern pthread_mutex_t EventExitQueueMutex; 
211 extern HashList *InboundEventQueue;
212 extern struct ev_loop *event_base;
213 extern ev_async AddJob;
214 extern ev_async ExitEventLoop;
215
216 static void IO_abort_shutdown_callback(struct ev_loop *loop,
217                                        ev_cleanup *watcher,
218                                        int revents)
219 {
220         AsyncIO *IO = watcher->data;
221
222         SetEVState(IO, eIOAbort);
223         EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
224         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
225         assert(IO->ShutdownAbort);
226         IO->ShutdownAbort(IO);
227 }
228
229
230 eNextState QueueAnEventContext(AsyncIO *IO)
231 {
232         IOAddHandler *h;
233         int i;
234
235         SetEVState(IO, eIOQ);
236         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
237         h->IO = IO;
238
239         assert(IO->ReAttachCB != NULL);
240
241         h->EvAttch = IO->ReAttachCB;
242
243         ev_cleanup_init(&IO->abort_by_shutdown,
244                         IO_abort_shutdown_callback);
245         IO->abort_by_shutdown.data = IO;
246
247         pthread_mutex_lock(&EventQueueMutex);
248         if (InboundEventQueue == NULL)
249         {
250                 free(h);
251                 /* shutting down... */
252                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
253                 pthread_mutex_unlock(&EventQueueMutex);
254                 return eAbort;
255         }
256         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
257         i = ++evbase_count;
258         Put(InboundEventQueue, IKEY(i), h, NULL);
259         pthread_mutex_unlock(&EventQueueMutex);
260
261         pthread_mutex_lock(&EventExitQueueMutex);
262         if (event_base == NULL) {
263                 pthread_mutex_unlock(&EventExitQueueMutex);
264                 return eAbort;
265         }
266         ev_async_send (event_base, &AddJob);
267         pthread_mutex_unlock(&EventExitQueueMutex);
268         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
269         return eSendReply;
270 }
271
272 eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs)
273 {
274         StopClientWatchers(IO, CloseFDs);
275         IO->ReAttachCB = CB;
276         return eDBQuery;
277 }
278 eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB)
279 {
280         StopDBWatchers(IO);
281         IO->ReAttachCB = CB;
282         return eSendReply;
283 }
284
285 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
286 {
287         IO->ReAttachCB = CB;
288         return QueueAnEventContext(IO);
289 }
290
291 extern eNextState evcurl_handle_start(AsyncIO *IO);
292
293 eNextState QueueCurlContext(AsyncIO *IO)
294 {
295         IOAddHandler *h;
296         int i;
297
298         SetEVState(IO, eCurlQ);
299         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
300         h->IO = IO;
301         h->EvAttch = evcurl_handle_start;
302
303         pthread_mutex_lock(&EventQueueMutex);
304         if (InboundEventQueue == NULL)
305         {
306                 /* shutting down... */
307                 free(h);
308                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
309                 pthread_mutex_unlock(&EventQueueMutex);
310                 return eAbort;
311         }
312
313         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
314         i = ++evbase_count;
315         Put(InboundEventQueue, IKEY(i), h, NULL);
316         pthread_mutex_unlock(&EventQueueMutex);
317
318         pthread_mutex_lock(&EventExitQueueMutex);
319         if (event_base == NULL) {
320                 pthread_mutex_unlock(&EventExitQueueMutex);
321                 return eAbort;
322         }
323         ev_async_send (event_base, &AddJob);
324         pthread_mutex_unlock(&EventExitQueueMutex);
325
326         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
327         return eSendReply;
328 }
329
330 eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
331 {
332         StopCurlWatchers(IO);
333         IO->ReAttachCB = CB;
334         return eDBQuery;
335 }
336
337
338 void FreeAsyncIOContents(AsyncIO *IO)
339 {
340         CitContext *Ctx = IO->CitContext;
341
342         FreeStrBuf(&IO->IOBuf);
343         FreeStrBuf(&IO->SendBuf.Buf);
344         FreeStrBuf(&IO->RecvBuf.Buf);
345
346         FreeURL(&IO->ConnectMe);
347         FreeStrBuf(&IO->HttpReq.ReplyData);
348
349         if (Ctx) {
350                 Ctx->state = CON_IDLE;
351                 Ctx->kill_me = 1;
352                 IO->CitContext = NULL;
353         }
354 }
355
356
357 void DestructCAres(AsyncIO *IO);
358 void StopClientWatchers(AsyncIO *IO, int CloseFD)
359 {
360         EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers");
361         
362         DestructCAres(IO);
363
364         ev_timer_stop (event_base, &IO->rw_timeout);
365         ev_timer_stop(event_base, &IO->conn_fail);
366         ev_idle_stop(event_base, &IO->unwind_stack);
367         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
368
369         ev_io_stop(event_base, &IO->conn_event);
370         ev_io_stop(event_base, &IO->send_event);
371         ev_io_stop(event_base, &IO->recv_event);
372
373         if (CloseFD && (IO->SendBuf.fd > 0)) {
374                 close(IO->SendBuf.fd);
375                 IO->SendBuf.fd = 0;
376                 IO->RecvBuf.fd = 0;
377         }
378 }
379
380 void StopCurlWatchers(AsyncIO *IO)
381 {
382         EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n");
383
384         ev_timer_stop (event_base, &IO->rw_timeout);
385         ev_timer_stop(event_base, &IO->conn_fail);
386         ev_idle_stop(event_base, &IO->unwind_stack);
387         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
388
389         ev_io_stop(event_base, &IO->conn_event);
390         ev_io_stop(event_base, &IO->send_event);
391         ev_io_stop(event_base, &IO->recv_event);
392
393         curl_easy_cleanup(IO->HttpReq.chnd);
394         IO->HttpReq.chnd = NULL;
395
396         if (IO->SendBuf.fd != 0) {
397                 close(IO->SendBuf.fd);
398         }
399         IO->SendBuf.fd = 0;
400         IO->RecvBuf.fd = 0;
401 }
402
403 eNextState ShutDownCLient(AsyncIO *IO)
404 {
405         CitContext *Ctx =IO->CitContext;
406
407         SetEVState(IO, eExit);
408         become_session(Ctx);
409
410         EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
411
412         StopClientWatchers(IO, 1);
413
414         if (IO->DNS.Channel != NULL) {
415                 ares_destroy(IO->DNS.Channel);
416                 EV_DNS_LOG_STOP(DNS.recv_event);
417                 EV_DNS_LOG_STOP(DNS.send_event);
418                 ev_io_stop(event_base, &IO->DNS.recv_event);
419                 ev_io_stop(event_base, &IO->DNS.send_event);
420                 IO->DNS.Channel = NULL;
421         }
422         assert(IO->Terminate);
423         return IO->Terminate(IO);
424 }
425
426 void PostInbound(AsyncIO *IO)
427 {
428
429         switch (IO->NextState) {
430         case eSendFile:
431                 ev_io_start(event_base, &IO->send_event);
432                 break;
433         case eSendReply:
434         case eSendMore:
435                 assert(IO->SendDone);
436                 IO->NextState = IO->SendDone(IO);
437                 switch (IO->NextState)
438                 {
439                 case eSendFile:
440                 case eSendReply:
441                 case eSendMore:
442                 case eReadMessage:
443                 case eReadPayload:
444                 case eReadMore:
445                 case eReadFile:
446                         ev_io_start(event_base, &IO->send_event);
447                         break;
448                 case eDBQuery:
449                         StopClientWatchers(IO, 0);
450                         QueueAnDBOperation(IO);
451                 default:
452                         break;
453                 }
454                 break;
455         case eReadPayload:
456         case eReadMore:
457         case eReadFile:
458                 ev_io_start(event_base, &IO->recv_event);
459                 break;
460         case eTerminateConnection:
461         case eAbort:
462                 if (ShutDownCLient(IO) == eDBQuery) {
463                         QueueAnDBOperation(IO);
464                 }
465                 break;
466         case eSendDNSQuery:
467         case eReadDNSReply:
468         case eConnect:
469         case eReadMessage:
470                 break;
471         case eDBQuery:
472                 QueueAnDBOperation(IO);
473         }
474 }
475 eReadState HandleInbound(AsyncIO *IO)
476 {
477         const char *Err = NULL;
478         eReadState Finished = eBufferNotEmpty;
479
480         become_session(IO->CitContext);
481
482         while ((Finished == eBufferNotEmpty) &&
483                ((IO->NextState == eReadMessage)||
484                 (IO->NextState == eReadMore)||
485                 (IO->NextState == eReadFile)||
486                 (IO->NextState == eReadPayload)))
487         {
488                 /* Reading lines...
489                  * lex line reply in callback,
490                  * or do it ourselves.
491                  * i.e. as nnn-blabla means continue reading in SMTP
492                  */
493                 if ((IO->NextState == eReadFile) &&
494                     (Finished == eBufferNotEmpty))
495                 {
496                         Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
497                         if (Finished == eReadSuccess)
498                         {
499                                 IO->NextState = eSendReply;
500                         }
501                 }
502                 else if (IO->LineReader)
503                         Finished = IO->LineReader(IO);
504                 else
505                         Finished = StrBufChunkSipLine(IO->IOBuf,
506                                                       &IO->RecvBuf);
507
508                 switch (Finished) {
509                 case eMustReadMore: /// read new from socket...
510                         break;
511                 case eBufferNotEmpty: /* shouldn't happen... */
512                 case eReadSuccess: /// done for now...
513                         break;
514                 case eReadFail: /// WHUT?
515                                 ///todo: shut down!
516                         break;
517                 }
518
519                 if (Finished != eMustReadMore) {
520                         ev_io_stop(event_base, &IO->recv_event);
521                         IO->NextState = IO->ReadDone(IO);
522                         if  (IO->NextState == eDBQuery) {
523                                 if (QueueAnDBOperation(IO) == eAbort)
524                                         return eReadFail;
525                                 else
526                                         return eReadSuccess;
527                         }
528                         else {
529                                 Finished = StrBufCheckBuffer(&IO->RecvBuf);
530                         }
531                 }
532         }
533
534         PostInbound(IO);
535
536         return Finished;
537 }
538
539
540 static void
541 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
542 {
543         int rc;
544         AsyncIO *IO = watcher->data;
545         const char *errmsg = NULL;
546
547         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
548         become_session(IO->CitContext);
549 #ifdef BIGBAD_IODBG
550         {
551                 int rv = 0;
552                 char fn [SIZ];
553                 FILE *fd;
554                 const char *pch = ChrPtr(IO->SendBuf.Buf);
555                 const char *pchh = IO->SendBuf.ReadWritePointer;
556                 long nbytes;
557
558                 if (pchh == NULL)
559                         pchh = pch;
560
561                 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
562                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
563                          ((CitContext*)(IO->CitContext))->ServiceName,
564                          IO->SendBuf.fd);
565
566                 fd = fopen(fn, "a+");
567                 if (fd == NULL) {
568                         syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno));
569                         cit_backtrace();
570                         exit(1);
571                 }
572                 fprintf(fd, "Send: BufSize: %ld BufContent: [",
573                         nbytes);
574                 rv = fwrite(pchh, nbytes, 1, fd);
575                 if (!rv) printf("failed to write debug to %s!\n", fn);
576                 fprintf(fd, "]\n");
577 #endif
578                 switch (IO->NextState) {
579                 case eSendFile:
580                         rc = FileSendChunked(&IO->IOB, &errmsg);
581                         if (rc < 0)
582                                 StrBufPlain(IO->ErrMsg, errmsg, -1);
583                         break;
584                 default:
585                         rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
586                                                              0,
587                                                              &IO->SendBuf);
588                 }
589
590 #ifdef BIGBAD_IODBG
591                 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
592                 fclose(fd);
593         }
594 #endif
595         if (rc == 0)
596         {
597                 ev_io_stop(event_base, &IO->send_event);
598                 switch (IO->NextState) {
599                 case eSendMore:
600                         assert(IO->SendDone);
601                         IO->NextState = IO->SendDone(IO);
602
603                         if ((IO->NextState == eTerminateConnection) ||
604                             (IO->NextState == eAbort) )
605                                 ShutDownCLient(IO);
606                         else {
607                                 ev_io_start(event_base, &IO->send_event);
608                         }
609                         break;
610                 case eSendFile:
611                         if (IO->IOB.ChunkSendRemain > 0) {
612                                 ev_io_start(event_base, &IO->recv_event);
613                                 SetNextTimeout(IO, 100.0);
614
615                         } else {
616                                 assert(IO->ReadDone);
617                                 IO->NextState = IO->ReadDone(IO);
618                                 switch(IO->NextState) {
619                                 case eSendDNSQuery:
620                                 case eReadDNSReply:
621                                 case eDBQuery:
622                                 case eConnect:
623                                         break;
624                                 case eSendReply:
625                                 case eSendMore:
626                                 case eSendFile:
627                                         ev_io_start(event_base,
628                                                     &IO->send_event);
629                                         break;
630                                 case eReadMessage:
631                                 case eReadMore:
632                                 case eReadPayload:
633                                 case eReadFile:
634                                         break;
635                                 case eTerminateConnection:
636                                 case eAbort:
637                                         break;
638                                 }
639                         }
640                         break;
641                 case eSendReply:
642                     if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
643                         break;
644                     IO->NextState = eReadMore;
645                 case eReadMore:
646                 case eReadMessage:
647                 case eReadPayload:
648                 case eReadFile:
649                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
650                         {
651                                 HandleInbound(IO);
652                         }
653                         else {
654                                 ev_io_start(event_base, &IO->recv_event);
655                         }
656
657                         break;
658                 case eDBQuery:
659                         /*
660                          * we now live in another queue,
661                          * so we have to unregister.
662                          */
663                         ev_cleanup_stop(loop, &IO->abort_by_shutdown);
664                         break;
665                 case eSendDNSQuery:
666                 case eReadDNSReply:
667                 case eConnect:
668                 case eTerminateConnection:
669                 case eAbort:
670                         break;
671                 }
672         }
673         else if (rc < 0) {
674                 if (errno != EAGAIN) {
675                         StopClientWatchers(IO, 1);
676                         EV_syslog(LOG_DEBUG,
677                                   "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
678                                   errno, strerror(errno), IO->SendBuf.fd);
679                         StrBufPrintf(IO->ErrMsg,
680                                      "Socket Invalid! [%s]",
681                                      strerror(errno));
682                         SetNextTimeout(IO, 0.01);
683                 }
684         }
685         /* else : must write more. */
686 }
687 static void
688 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
689 {
690         ev_timer_stop(event_base, &IO->conn_fail);
691         ev_timer_start(event_base, &IO->rw_timeout);
692
693         switch(IO->NextState) {
694         case eReadMore:
695         case eReadMessage:
696         case eReadFile:
697                 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
698                 ev_io_start(event_base, &IO->recv_event);
699                 break;
700         case eSendReply:
701         case eSendMore:
702         case eReadPayload:
703         case eSendFile:
704                 become_session(IO->CitContext);
705                 IO_send_callback(loop, &IO->send_event, revents);
706                 break;
707         case eDBQuery:
708         case eSendDNSQuery:
709         case eReadDNSReply:
710         case eConnect:
711         case eTerminateConnection:
712         case eAbort:
713                 /// TODO: WHUT?
714                 break;
715         }
716 }
717
718 static void
719 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
720 {
721         AsyncIO *IO = watcher->data;
722
723         SetEVState(IO, eIOTimeout);
724         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
725         ev_timer_stop (event_base, &IO->rw_timeout);
726         become_session(IO->CitContext);
727
728         if (IO->SendBuf.fd != 0)
729         {
730                 ev_io_stop(event_base, &IO->send_event);
731                 ev_io_stop(event_base, &IO->recv_event);
732                 ev_timer_stop (event_base, &IO->rw_timeout);
733                 close(IO->SendBuf.fd);
734                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
735         }
736
737         assert(IO->Timeout);
738         switch (IO->Timeout(IO))
739         {
740         case eAbort:
741                 ShutDownCLient(IO);
742         default:
743                 break;
744         }
745 }
746
747 static void
748 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
749 {
750         AsyncIO *IO = watcher->data;
751
752         SetEVState(IO, eIOConnfail);
753         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
754         ev_timer_stop (event_base, &IO->conn_fail);
755
756         if (IO->SendBuf.fd != 0)
757         {
758                 ev_io_stop(loop, &IO->conn_event);
759                 ev_io_stop(event_base, &IO->send_event);
760                 ev_io_stop(event_base, &IO->recv_event);
761                 ev_timer_stop (event_base, &IO->rw_timeout);
762                 close(IO->SendBuf.fd);
763                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
764         }
765         become_session(IO->CitContext);
766
767         assert(IO->ConnFail);
768         switch (IO->ConnFail(IO))
769         {
770         case eAbort:
771                 ShutDownCLient(IO);
772         default:
773                 break;
774
775         }
776 }
777
778 static void
779 IO_connfailimmediate_callback(struct ev_loop *loop,
780                               ev_idle *watcher,
781                               int revents)
782 {
783         AsyncIO *IO = watcher->data;
784
785         SetEVState(IO, eIOConnfailNow);
786         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
787         ev_idle_stop (event_base, &IO->conn_fail_immediate);
788
789         if (IO->SendBuf.fd != 0)
790         {
791                 close(IO->SendBuf.fd);
792                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
793         }
794         become_session(IO->CitContext);
795
796         assert(IO->ConnFail);
797         switch (IO->ConnFail(IO))
798         {
799         case eAbort:
800                 ShutDownCLient(IO);
801         default:
802                 break;
803
804         }
805 }
806
807 static void
808 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
809 {
810         AsyncIO *IO = watcher->data;
811         int             so_err = 0;
812         socklen_t       lon = sizeof(so_err);
813         int             err;
814
815         SetEVState(IO, eIOConnNow);
816         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
817         EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
818
819         ev_io_stop(loop, &IO->conn_event);
820         ev_timer_stop(event_base, &IO->conn_fail);
821
822         err = getsockopt(IO->SendBuf.fd,
823                          SOL_SOCKET,
824                          SO_ERROR,
825                          (void*)&so_err,
826                          &lon);
827
828         if ((err == 0) && (so_err != 0))
829         {
830                 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
831                           so_err,
832                           strerror(so_err));
833                 IO_connfail_callback(loop, &IO->conn_fail, revents);
834
835         }
836         else
837         {
838                 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
839                 set_start_callback(loop, IO, revents);
840         }
841 }
842
843 static void
844 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
845 {
846         const char *errmsg;
847         ssize_t nbytes;
848         AsyncIO *IO = watcher->data;
849
850         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
851         switch (IO->NextState) {
852         case eReadFile:
853                 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
854                 if (nbytes < 0)
855                         StrBufPlain(IO->ErrMsg, errmsg, -1);
856                 else
857                 {
858                         if (IO->IOB.ChunkSendRemain == 0)
859                         {
860                                 IO->NextState = eSendReply;
861                                 assert(IO->ReadDone);
862                                 ev_io_stop(event_base, &IO->recv_event);
863                                 PostInbound(IO);
864                                 return;
865                         }
866                         else
867                                 return;
868                 }
869                 break;
870         default:
871                 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
872                                                         0,
873                                                         &IO->RecvBuf);
874                 break;
875         }
876
877 #ifdef BIGBAD_IODBG
878         {
879                 long nbytes;
880                 int rv = 0;
881                 char fn [SIZ];
882                 FILE *fd;
883                 const char *pch = ChrPtr(IO->RecvBuf.Buf);
884                 const char *pchh = IO->RecvBuf.ReadWritePointer;
885
886                 if (pchh == NULL)
887                         pchh = pch;
888
889                 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
890                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
891                          ((CitContext*)(IO->CitContext))->ServiceName,
892                          IO->SendBuf.fd);
893
894                 fd = fopen(fn, "a+");
895                 if (fd == NULL) {
896                         syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno));
897                         cit_backtrace();
898                         exit(1);
899                 }
900                 fprintf(fd, "Read: BufSize: %ld BufContent: [",
901                         nbytes);
902                 rv = fwrite(pchh, nbytes, 1, fd);
903                 if (!rv) printf("failed to write debug to %s!\n", fn);
904                 fprintf(fd, "]\n");
905                 fclose(fd);
906         }
907 #endif
908         if (nbytes > 0) {
909                 HandleInbound(IO);
910         } else if (nbytes == 0) {
911                 StopClientWatchers(IO, 1);
912                 SetNextTimeout(IO, 0.01);
913                 return;
914         } else if (nbytes == -1) {
915                 if (errno != EAGAIN) {
916                         // FD is gone. kick it. 
917                         StopClientWatchers(IO, 1);
918                         EV_syslog(LOG_DEBUG,
919                                   "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
920                                   errno, strerror(errno), IO->SendBuf.fd);
921                         StrBufPrintf(IO->ErrMsg,
922                                      "Socket Invalid! [%s]",
923                                      strerror(errno));
924                         SetNextTimeout(IO, 0.01);
925                 }
926                 return;
927         }
928 }
929
930 void
931 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
932 {
933         AsyncIO *IO = watcher->data;
934
935         SetEVState(IO, eCaresFinished);
936         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
937         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
938         become_session(IO->CitContext);
939         assert(IO->DNS.Query->PostDNS);
940         switch (IO->DNS.Query->PostDNS(IO))
941         {
942         case eAbort:
943                 assert(IO->DNS.Fail);
944                 switch (IO->DNS.Fail(IO)) {
945                 case eAbort:
946 ////                    StopClientWatchers(IO);
947                         ShutDownCLient(IO);
948                         break;
949                 case eDBQuery:
950                         StopClientWatchers(IO, 0);
951                         QueueAnDBOperation(IO);
952                         break;
953                 default:
954                         break;
955                 }
956         case eDBQuery:
957                 StopClientWatchers(IO, 0);
958                 QueueAnDBOperation(IO);
959                 break;
960         default:
961                 break;
962         }
963 }
964
965
966 eNextState EvConnectSock(AsyncIO *IO,
967                          double conn_timeout,
968                          double first_rw_timeout,
969                          int ReadFirst)
970 {
971         struct sockaddr_in egress_sin;
972         int fdflags;
973         int rc = -1;
974
975         SetEVState(IO, eIOConnectSock);
976         become_session(IO->CitContext);
977
978         if (ReadFirst) {
979                 IO->NextState = eReadMessage;
980         }
981         else {
982                 IO->NextState = eSendReply;
983         }
984
985         IO->SendBuf.fd = IO->RecvBuf.fd =
986                 socket(
987                         (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
988                         SOCK_STREAM,
989                         IPPROTO_TCP);
990
991         if (IO->SendBuf.fd < 0) {
992                 EV_syslog(LOG_ERR,
993                           "EVENT: socket() failed: %s\n",
994                           strerror(errno));
995
996                 StrBufPrintf(IO->ErrMsg,
997                              "Failed to create socket: %s",
998                              strerror(errno));
999                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1000                 return eAbort;
1001         }
1002         fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
1003         if (fdflags < 0) {
1004                 EV_syslog(LOG_ERR,
1005                           "EVENT: unable to get socket %d flags! %s \n",
1006                           IO->SendBuf.fd,
1007                           strerror(errno));
1008                 StrBufPrintf(IO->ErrMsg,
1009                              "Failed to get socket %d flags: %s",
1010                              IO->SendBuf.fd,
1011                              strerror(errno));
1012                 close(IO->SendBuf.fd);
1013                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1014                 return eAbort;
1015         }
1016         fdflags = fdflags | O_NONBLOCK;
1017         if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
1018                 EV_syslog(
1019                         LOG_ERR,
1020                         "EVENT: unable to set socket %d nonblocking flags! %s \n",
1021                         IO->SendBuf.fd,
1022                         strerror(errno));
1023                 StrBufPrintf(IO->ErrMsg,
1024                              "Failed to set socket flags: %s",
1025                              strerror(errno));
1026                 close(IO->SendBuf.fd);
1027                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1028                 return eAbort;
1029         }
1030 /* TODO: maye we could use offsetof() to calc the position of data...
1031  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
1032  */
1033         ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
1034         IO->recv_event.data = IO;
1035         ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
1036         IO->send_event.data = IO;
1037
1038         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1039         IO->conn_fail.data = IO;
1040         ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1041         IO->rw_timeout.data = IO;
1042
1043
1044
1045
1046         /* for debugging you may bypass it like this:
1047          * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1048          * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1049          *   inet_addr("127.0.0.1");
1050          */
1051         if (IO->ConnectMe->IPv6) {
1052                 rc = connect(IO->SendBuf.fd,
1053                              &IO->ConnectMe->Addr,
1054                              sizeof(struct sockaddr_in6));
1055         }
1056         else {
1057                 /* If citserver is bound to a specific IP address on the host, make
1058                  * sure we use that address for outbound connections.
1059                  */
1060         
1061                 memset(&egress_sin, 0, sizeof(egress_sin));
1062                 egress_sin.sin_family = AF_INET;
1063                 if (!IsEmptyStr(CtdlGetConfigStr("c_ip_addr"))) {
1064                         egress_sin.sin_addr.s_addr = inet_addr(CtdlGetConfigStr("c_ip_addr"));
1065                         if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1066                                 egress_sin.sin_addr.s_addr = INADDR_ANY;
1067                         }
1068
1069                         /* If this bind fails, no problem; we can still use INADDR_ANY */
1070                         bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1071                 }
1072                 rc = connect(IO->SendBuf.fd,
1073                              (struct sockaddr_in *)&IO->ConnectMe->Addr,
1074                              sizeof(struct sockaddr_in));
1075         }
1076
1077         if (rc >= 0){
1078                 SetEVState(IO, eIOConnNow);
1079                 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1080                 set_start_callback(event_base, IO, 0);
1081                 return IO->NextState;
1082         }
1083         else if (errno == EINPROGRESS) {
1084                 SetEVState(IO, eIOConnWait);
1085                 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1086
1087                 ev_io_init(&IO->conn_event,
1088                            IO_connestd_callback,
1089                            IO->SendBuf.fd,
1090                            EV_READ|EV_WRITE);
1091
1092                 IO->conn_event.data = IO;
1093
1094                 ev_io_start(event_base, &IO->conn_event);
1095                 ev_timer_start(event_base, &IO->conn_fail);
1096                 return IO->NextState;
1097         }
1098         else {
1099                 SetEVState(IO, eIOConnfail);
1100                 ev_idle_init(&IO->conn_fail_immediate,
1101                              IO_connfailimmediate_callback);
1102                 IO->conn_fail_immediate.data = IO;
1103                 ev_idle_start(event_base, &IO->conn_fail_immediate);
1104
1105                 EV_syslog(LOG_ERR,
1106                           "connect() = %d failed: %s\n",
1107                           IO->SendBuf.fd,
1108                           strerror(errno));
1109
1110                 StrBufPrintf(IO->ErrMsg,
1111                              "Failed to connect: %s",
1112                              strerror(errno));
1113                 return IO->NextState;
1114         }
1115         return IO->NextState;
1116 }
1117
1118 void SetNextTimeout(AsyncIO *IO, double timeout)
1119 {
1120         IO->rw_timeout.repeat = timeout;
1121         ev_timer_again (event_base,  &IO->rw_timeout);
1122 }
1123
1124
1125 eNextState ReAttachIO(AsyncIO *IO,
1126                       void *pData,
1127                       int ReadFirst)
1128 {
1129         SetEVState(IO, eIOAttach);
1130         IO->Data = pData;
1131         become_session(IO->CitContext);
1132         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1133         if (ReadFirst) {
1134                 IO->NextState = eReadMessage;
1135         }
1136         else {
1137                 IO->NextState = eSendReply;
1138         }
1139         set_start_callback(event_base, IO, 0);
1140
1141         return IO->NextState;
1142 }
1143
1144 void InitIOStruct(AsyncIO *IO,
1145                   void *Data,
1146                   eNextState NextState,
1147                   IO_LineReaderCallback LineReader,
1148                   IO_CallBack DNS_Fail,
1149                   IO_CallBack SendDone,
1150                   IO_CallBack ReadDone,
1151                   IO_CallBack Terminate,
1152                   IO_CallBack DBTerminate,
1153                   IO_CallBack ConnFail,
1154                   IO_CallBack Timeout,
1155                   IO_CallBack ShutdownAbort)
1156 {
1157         IO->Data          = Data;
1158
1159         IO->CitContext    = CloneContext(CC);
1160         IO->CitContext->session_specific_data = Data;
1161         IO->CitContext->IO = IO;
1162
1163         IO->NextState     = NextState;
1164
1165         IO->SendDone      = SendDone;
1166         IO->ReadDone      = ReadDone;
1167         IO->Terminate     = Terminate;
1168         IO->DBTerminate   = DBTerminate;
1169         IO->LineReader    = LineReader;
1170         IO->ConnFail      = ConnFail;
1171         IO->Timeout       = Timeout;
1172         IO->ShutdownAbort = ShutdownAbort;
1173
1174         IO->DNS.Fail      = DNS_Fail;
1175
1176         IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
1177         IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
1178         IO->IOBuf         = NewStrBuf();
1179         EV_syslog(LOG_DEBUG,
1180                   "EVENT: Session lives at %p IO at %p \n",
1181                   Data, IO);
1182
1183 }
1184
1185 extern int evcurl_init(AsyncIO *IO);
1186
1187 int InitcURLIOStruct(AsyncIO *IO,
1188                      void *Data,
1189                      const char* Desc,
1190                      IO_CallBack SendDone,
1191                      IO_CallBack Terminate,
1192                      IO_CallBack DBTerminate,
1193                      IO_CallBack ShutdownAbort)
1194 {
1195         IO->Data          = Data;
1196
1197         IO->CitContext    = CloneContext(CC);
1198         IO->CitContext->session_specific_data = Data;
1199         IO->CitContext->IO = IO;
1200
1201         IO->SendDone      = SendDone;
1202         IO->Terminate     = Terminate;
1203         IO->DBTerminate   = DBTerminate;
1204         IO->ShutdownAbort = ShutdownAbort;
1205
1206         strcpy(IO->HttpReq.errdesc, Desc);
1207
1208
1209         return  evcurl_init(IO);
1210
1211 }
1212
1213
1214 typedef struct KillOtherSessionContext {
1215         AsyncIO IO;
1216         AsyncIO *OtherOne;
1217 }KillOtherSessionContext;
1218
1219 eNextState KillTerminate(AsyncIO *IO)
1220 {
1221         long id;
1222         KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1223         EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1224         id = IO->ID;
1225         FreeAsyncIOContents(IO);
1226         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1227         IO->ID = id; /* just for the case we want to analyze it in a coredump */
1228         free(Ctx);
1229         return eAbort;
1230
1231 }
1232
1233 eNextState KillShutdown(AsyncIO *IO)
1234 {
1235         return eTerminateConnection;
1236 }
1237
1238 eNextState KillOtherContextNow(AsyncIO *IO)
1239 {
1240         KillOtherSessionContext *Ctx = IO->Data;
1241
1242         SetEVState(IO, eKill);
1243
1244         if (Ctx->OtherOne->ShutdownAbort != NULL) {
1245                 Ctx->OtherOne->NextState = eAbort;
1246                 if (Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne) == eDBQuery) {
1247                         StopClientWatchers(Ctx->OtherOne, 0);
1248                         QueueAnDBOperation(Ctx->OtherOne);
1249                 }
1250         }
1251         return eTerminateConnection;
1252 }
1253
1254 void KillAsyncIOContext(AsyncIO *IO)
1255 {
1256         KillOtherSessionContext *Ctx;
1257
1258         Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1259         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1260         
1261         InitIOStruct(&Ctx->IO,
1262                      Ctx,
1263                      eReadMessage,
1264                      NULL,
1265                      NULL,
1266                      NULL,
1267                      NULL,
1268                      KillTerminate,
1269                      NULL,
1270                      NULL,
1271                      NULL,
1272                      KillShutdown);
1273
1274         Ctx->OtherOne = IO;
1275
1276         switch(IO->NextState) {
1277         case eSendDNSQuery:
1278         case eReadDNSReply:
1279
1280         case eConnect:
1281         case eSendReply:
1282         case eSendMore:
1283         case eSendFile:
1284
1285         case eReadMessage:
1286         case eReadMore:
1287         case eReadPayload:
1288         case eReadFile:
1289                 Ctx->IO.ReAttachCB = KillOtherContextNow;
1290                 QueueAnEventContext(&Ctx->IO);
1291                 break;
1292         case eDBQuery:
1293                 Ctx->IO.ReAttachCB = KillOtherContextNow;
1294                 QueueAnDBOperation(&Ctx->IO);
1295                 break;
1296         case eTerminateConnection:
1297         case eAbort:
1298                 /*hm, its already dying, dunno which Queue its in... */
1299                 free(Ctx);
1300         }
1301         
1302 }
1303
1304 extern int DebugEventLoopBacktrace;
1305 void EV_backtrace(AsyncIO *IO)
1306 {
1307 #ifdef HAVE_BACKTRACE
1308         void *stack_frames[50];
1309         size_t size, i;
1310         char **strings;
1311
1312         if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1313                 return;
1314         size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1315         strings = backtrace_symbols(stack_frames, size);
1316         for (i = 0; i < size; i++) {
1317                 if (strings != NULL) {
1318                         EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1319                 }
1320                 else {
1321                         EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1322                 }
1323         }
1324         free(strings);
1325 #endif
1326 }
1327
1328
1329 ev_tstamp ctdl_ev_now (void)
1330 {
1331         return ev_now(event_base);
1332 }