Instead of initiating the transition between DB & IO-Queue from within the applicatio...
[citadel.git] / citadel / event_client.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12 #include "sysdep.h"
13
14 #include <stdio.h>
15 #include <string.h>
16 #include <syslog.h>
17 #include <assert.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #if HAVE_BACKTRACE
22 #include <execinfo.h>
23 #endif
24
25 #include <libcitadel.h>
26
27 #include "ctdl_module.h"
28 #include "event_client.h"
29
30 ConstStr IOStates[] = {
31         {HKEY("DB Queue")},
32         {HKEY("DB Q Next")},
33         {HKEY("DB Attach")},
34         {HKEY("DB Next")},
35         {HKEY("DB Stop")},
36         {HKEY("DB Exit")},
37         {HKEY("DB Terminate")},
38         {HKEY("IO Queue")},
39         {HKEY("IO Attach")},
40         {HKEY("IO Connect Socket")},
41         {HKEY("IO Abort")},
42         {HKEY("IO Timeout")},
43         {HKEY("IO ConnFail")},
44         {HKEY("IO ConnFail Now")},
45         {HKEY("IO Conn Now")},
46         {HKEY("IO Conn Wait")},
47         {HKEY("Curl Q")},
48         {HKEY("Curl Start")},
49         {HKEY("Curl Shotdown")},
50         {HKEY("Curl More IO")},
51         {HKEY("Curl Got IO")},
52         {HKEY("Curl Got Data")},
53         {HKEY("Curl Got Status")},
54         {HKEY("C-Ares Start")},
55         {HKEY("C-Ares IO Done")},
56         {HKEY("C-Ares Finished")},
57         {HKEY("C-Ares exit")},
58         {HKEY("Killing")},
59         {HKEY("Exit")}
60 };
61
62 void SetEVState(AsyncIO *IO, eIOState State)
63 {
64
65         CitContext* CCC = IO->CitContext;
66         if (CCC != NULL)
67                 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
68
69 }
70
71
72 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
73 static void IO_abort_shutdown_callback(struct ev_loop *loop,
74                                        ev_cleanup *watcher,
75                                        int revents);
76
77
78 /*------------------------------------------------------------------------------
79  *                              Server DB IO
80  *----------------------------------------------------------------------------*/
81 extern int evdb_count;
82 extern pthread_mutex_t DBEventQueueMutex;
83 extern pthread_mutex_t DBEventExitQueueMutex;
84 extern HashList *DBInboundEventQueue;
85 extern struct ev_loop *event_db;
86 extern ev_async DBAddJob;
87 extern ev_async DBExitEventLoop;
88
89 eNextState QueueAnDBOperation(AsyncIO *IO)
90 {
91         IOAddHandler *h;
92         int i;
93
94         SetEVState(IO, eDBQ);
95         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
96         h->IO = IO;
97
98         assert(IO->ReAttachCB != NULL);
99
100         h->EvAttch = IO->ReAttachCB;
101         ev_cleanup_init(&IO->db_abort_by_shutdown,
102                         IO_abort_shutdown_callback);
103         IO->db_abort_by_shutdown.data = IO;
104
105         pthread_mutex_lock(&DBEventQueueMutex);
106         if (DBInboundEventQueue == NULL)
107         {
108                 /* shutting down... */
109                 free(h);
110                 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
111                 pthread_mutex_unlock(&DBEventQueueMutex);
112                 return eAbort;
113         }
114         EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
115         i = ++evdb_count ;
116         Put(DBInboundEventQueue, IKEY(i), h, NULL);
117         pthread_mutex_unlock(&DBEventQueueMutex);
118
119         pthread_mutex_lock(&DBEventExitQueueMutex);
120         if (event_db == NULL)
121         {
122                 pthread_mutex_unlock(&DBEventExitQueueMutex);
123                 return eAbort;
124         }
125         ev_async_send (event_db, &DBAddJob);
126         pthread_mutex_unlock(&DBEventExitQueueMutex);
127
128         EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
129         return eDBQuery;
130 }
131
132 void StopDBWatchers(AsyncIO *IO)
133 {
134         SetEVState(IO, eDBStop);
135         ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
136         ev_idle_stop(event_db, &IO->db_unwind_stack);
137 }
138
139 void ShutDownDBCLient(AsyncIO *IO)
140 {
141         CitContext *Ctx =IO->CitContext;
142         become_session(Ctx);
143
144         SetEVState(IO, eDBTerm);
145         EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
146         StopDBWatchers(IO);
147
148         assert(IO->DBTerminate);
149         IO->DBTerminate(IO);
150 }
151
152 void
153 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
154 {
155         AsyncIO *IO = watcher->data;
156
157         SetEVState(IO, eDBNext);
158         IO->Now = ev_now(event_db);
159         EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
160         become_session(IO->CitContext);
161
162         ev_idle_stop(event_db, &IO->db_unwind_stack);
163
164         assert(IO->NextDBOperation);
165         switch (IO->NextDBOperation(IO))
166         {
167         case eDBQuery:
168                 break;
169         case eSendDNSQuery:
170         case eReadDNSReply:
171         case eConnect:
172         case eSendReply:
173         case eSendMore:
174         case eSendFile:
175         case eReadMessage:
176         case eReadMore:
177         case eReadPayload:
178         case eReadFile:
179                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
180                 break;
181         case eTerminateConnection:
182         case eAbort:
183                 ev_idle_stop(event_db, &IO->db_unwind_stack);
184                 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
185                 ShutDownDBCLient(IO);
186         }
187 }
188
189 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
190 {
191         SetEVState(IO, eQDBNext);
192         IO->NextDBOperation = CB;
193         ev_idle_init(&IO->db_unwind_stack,
194                      DB_PerformNext);
195         IO->db_unwind_stack.data = IO;
196         ev_idle_start(event_db, &IO->db_unwind_stack);
197         return eDBQuery;
198 }
199
200 /*------------------------------------------------------------------------------
201  *                      Client IO
202  *----------------------------------------------------------------------------*/
203 extern int evbase_count;
204 extern pthread_mutex_t EventQueueMutex;
205 extern pthread_mutex_t EventExitQueueMutex; 
206 extern HashList *InboundEventQueue;
207 extern struct ev_loop *event_base;
208 extern ev_async AddJob;
209 extern ev_async ExitEventLoop;
210
211 static void IO_abort_shutdown_callback(struct ev_loop *loop,
212                                        ev_cleanup *watcher,
213                                        int revents)
214 {
215         AsyncIO *IO = watcher->data;
216
217         SetEVState(IO, eIOAbort);
218         EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
219         IO->Now = ev_now(event_base);
220         assert(IO->ShutdownAbort);
221         IO->ShutdownAbort(IO);
222 }
223
224
225 eNextState QueueAnEventContext(AsyncIO *IO)
226 {
227         IOAddHandler *h;
228         int i;
229
230         SetEVState(IO, eIOQ);
231         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
232         h->IO = IO;
233
234         assert(IO->ReAttachCB != NULL);
235
236         h->EvAttch = IO->ReAttachCB;
237
238         ev_cleanup_init(&IO->abort_by_shutdown,
239                         IO_abort_shutdown_callback);
240         IO->abort_by_shutdown.data = IO;
241
242         pthread_mutex_lock(&EventQueueMutex);
243         if (InboundEventQueue == NULL)
244         {
245                 free(h);
246                 /* shutting down... */
247                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
248                 pthread_mutex_unlock(&EventQueueMutex);
249                 return eAbort;
250         }
251         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
252         i = ++evbase_count;
253         Put(InboundEventQueue, IKEY(i), h, NULL);
254         pthread_mutex_unlock(&EventQueueMutex);
255
256         pthread_mutex_lock(&EventExitQueueMutex);
257         if (event_base == NULL) {
258                 pthread_mutex_unlock(&EventExitQueueMutex);
259                 return eAbort;
260         }
261         ev_async_send (event_base, &AddJob);
262         pthread_mutex_unlock(&EventExitQueueMutex);
263         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
264         return eSendReply;
265 }
266
267 eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs)
268 {
269         StopClientWatchers(IO, CloseFDs);
270         IO->ReAttachCB = CB;
271         return eDBQuery;
272 }
273 eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB)
274 {
275         StopDBWatchers(IO);
276         IO->ReAttachCB = CB;
277         return eSendReply;
278 }
279
280 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
281 {
282         IO->ReAttachCB = CB;
283         return QueueAnEventContext(IO);
284 }
285
286 extern eNextState evcurl_handle_start(AsyncIO *IO);
287
288 eNextState QueueCurlContext(AsyncIO *IO)
289 {
290         IOAddHandler *h;
291         int i;
292
293         SetEVState(IO, eCurlQ);
294         h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
295         h->IO = IO;
296         h->EvAttch = evcurl_handle_start;
297
298         pthread_mutex_lock(&EventQueueMutex);
299         if (InboundEventQueue == NULL)
300         {
301                 /* shutting down... */
302                 free(h);
303                 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
304                 pthread_mutex_unlock(&EventQueueMutex);
305                 return eAbort;
306         }
307
308         EVM_syslog(LOG_DEBUG, "EVENT Q\n");
309         i = ++evbase_count;
310         Put(InboundEventQueue, IKEY(i), h, NULL);
311         pthread_mutex_unlock(&EventQueueMutex);
312
313         pthread_mutex_lock(&EventExitQueueMutex);
314         if (event_base == NULL) {
315                 pthread_mutex_unlock(&EventExitQueueMutex);
316                 return eAbort;
317         }
318         ev_async_send (event_base, &AddJob);
319         pthread_mutex_unlock(&EventExitQueueMutex);
320
321         EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
322         return eSendReply;
323 }
324
325 eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
326 {
327         StopCurlWatchers(IO);
328         IO->ReAttachCB = CB;
329         return eDBQuery;
330 }
331
332
333 void FreeAsyncIOContents(AsyncIO *IO)
334 {
335         CitContext *Ctx = IO->CitContext;
336
337         FreeStrBuf(&IO->IOBuf);
338         FreeStrBuf(&IO->SendBuf.Buf);
339         FreeStrBuf(&IO->RecvBuf.Buf);
340
341         FreeURL(&IO->ConnectMe);
342         FreeStrBuf(&IO->HttpReq.ReplyData);
343
344         if (Ctx) {
345                 Ctx->state = CON_IDLE;
346                 Ctx->kill_me = 1;
347                 IO->CitContext = NULL;
348         }
349 }
350
351
352 void DestructCAres(AsyncIO *IO);
353 void StopClientWatchers(AsyncIO *IO, int CloseFD)
354 {
355         EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers");
356         
357         DestructCAres(IO);
358
359         ev_timer_stop (event_base, &IO->rw_timeout);
360         ev_timer_stop(event_base, &IO->conn_fail);
361         ev_idle_stop(event_base, &IO->unwind_stack);
362         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
363
364         ev_io_stop(event_base, &IO->conn_event);
365         ev_io_stop(event_base, &IO->send_event);
366         ev_io_stop(event_base, &IO->recv_event);
367
368         if (CloseFD && (IO->SendBuf.fd > 0)) {
369                 close(IO->SendBuf.fd);
370                 IO->SendBuf.fd = 0;
371                 IO->RecvBuf.fd = 0;
372         }
373 }
374
375 void StopCurlWatchers(AsyncIO *IO)
376 {
377         EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n");
378
379         ev_timer_stop (event_base, &IO->rw_timeout);
380         ev_timer_stop(event_base, &IO->conn_fail);
381         ev_idle_stop(event_base, &IO->unwind_stack);
382         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
383
384         ev_io_stop(event_base, &IO->conn_event);
385         ev_io_stop(event_base, &IO->send_event);
386         ev_io_stop(event_base, &IO->recv_event);
387
388         curl_easy_cleanup(IO->HttpReq.chnd);
389         IO->HttpReq.chnd = NULL;
390
391         if (IO->SendBuf.fd != 0) {
392                 close(IO->SendBuf.fd);
393         }
394         IO->SendBuf.fd = 0;
395         IO->RecvBuf.fd = 0;
396 }
397
398 void ShutDownCLient(AsyncIO *IO)
399 {
400         CitContext *Ctx =IO->CitContext;
401
402         SetEVState(IO, eExit);
403         become_session(Ctx);
404
405         EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
406
407         StopClientWatchers(IO, 1);
408
409         if (IO->DNS.Channel != NULL) {
410                 ares_destroy(IO->DNS.Channel);
411                 EV_DNS_LOG_STOP(DNS.recv_event);
412                 EV_DNS_LOG_STOP(DNS.send_event);
413                 ev_io_stop(event_base, &IO->DNS.recv_event);
414                 ev_io_stop(event_base, &IO->DNS.send_event);
415                 IO->DNS.Channel = NULL;
416         }
417         assert(IO->Terminate);
418         IO->Terminate(IO);
419 }
420
421 void PostInbound(AsyncIO *IO)
422 {
423         switch (IO->NextState) {
424         case eSendFile:
425                 ev_io_start(event_base, &IO->send_event);
426                 break;
427         case eSendReply:
428         case eSendMore:
429                 assert(IO->SendDone);
430                 IO->NextState = IO->SendDone(IO);
431                 switch (IO->NextState)
432                 {
433                 case eSendFile:
434                 case eSendReply:
435                 case eSendMore:
436                 case eReadMessage:
437                 case eReadPayload:
438                 case eReadMore:
439                 case eReadFile:
440                         ev_io_start(event_base, &IO->send_event);
441                         break;
442                 case eDBQuery:
443                         StopClientWatchers(IO, 0);
444                 default:
445                         break;
446                 }
447                 break;
448         case eReadPayload:
449         case eReadMore:
450         case eReadFile:
451                 ev_io_start(event_base, &IO->recv_event);
452                 break;
453         case eTerminateConnection:
454                 ShutDownCLient(IO);
455                 break;
456         case eAbort:
457                 ShutDownCLient(IO);
458                 break;
459         case eSendDNSQuery:
460         case eReadDNSReply:
461         case eDBQuery:
462         case eConnect:
463         case eReadMessage:
464                 break;
465         }
466 }
467 eReadState HandleInbound(AsyncIO *IO)
468 {
469         const char *Err = NULL;
470         eReadState Finished = eBufferNotEmpty;
471
472         become_session(IO->CitContext);
473
474         while ((Finished == eBufferNotEmpty) &&
475                ((IO->NextState == eReadMessage)||
476                 (IO->NextState == eReadMore)||
477                 (IO->NextState == eReadFile)||
478                 (IO->NextState == eReadPayload)))
479         {
480                 /* Reading lines...
481                  * lex line reply in callback,
482                  * or do it ourselves.
483                  * i.e. as nnn-blabla means continue reading in SMTP
484                  */
485                 if ((IO->NextState == eReadFile) &&
486                     (Finished == eBufferNotEmpty))
487                 {
488                         Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
489                         if (Finished == eReadSuccess)
490                         {
491                                 IO->NextState = eSendReply;
492                         }
493                 }
494                 else if (IO->LineReader)
495                         Finished = IO->LineReader(IO);
496                 else
497                         Finished = StrBufChunkSipLine(IO->IOBuf,
498                                                       &IO->RecvBuf);
499
500                 switch (Finished) {
501                 case eMustReadMore: /// read new from socket...
502                         break;
503                 case eBufferNotEmpty: /* shouldn't happen... */
504                 case eReadSuccess: /// done for now...
505                         break;
506                 case eReadFail: /// WHUT?
507                                 ///todo: shut down!
508                         break;
509                 }
510
511                 if (Finished != eMustReadMore) {
512                         eNextState rc;
513                         assert(IO->ReadDone);
514                         ev_io_stop(event_base, &IO->recv_event);
515                         rc = IO->ReadDone(IO);
516                         if  (rc == eDBQuery) {
517                                 return QueueAnDBOperation(IO);
518                         }
519                         else {
520                                 IO->NextState = rc;
521                                 Finished = StrBufCheckBuffer(&IO->RecvBuf);
522                         }
523                 }
524         }
525
526         PostInbound(IO);
527
528         return Finished;
529 }
530
531
532 static void
533 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
534 {
535         int rc;
536         AsyncIO *IO = watcher->data;
537         const char *errmsg = NULL;
538
539         IO->Now = ev_now(event_base);
540         become_session(IO->CitContext);
541 #ifdef BIGBAD_IODBG
542         {
543                 int rv = 0;
544                 char fn [SIZ];
545                 FILE *fd;
546                 const char *pch = ChrPtr(IO->SendBuf.Buf);
547                 const char *pchh = IO->SendBuf.ReadWritePointer;
548                 long nbytes;
549
550                 if (pchh == NULL)
551                         pchh = pch;
552
553                 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
554                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
555                          ((CitContext*)(IO->CitContext))->ServiceName,
556                          IO->SendBuf.fd);
557
558                 fd = fopen(fn, "a+");
559                 fprintf(fd, "Send: BufSize: %ld BufContent: [",
560                         nbytes);
561                 rv = fwrite(pchh, nbytes, 1, fd);
562                 if (!rv) printf("failed to write debug to %s!\n", fn);
563                 fprintf(fd, "]\n");
564 #endif
565                 switch (IO->NextState) {
566                 case eSendFile:
567                         rc = FileSendChunked(&IO->IOB, &errmsg);
568                         if (rc < 0)
569                                 StrBufPlain(IO->ErrMsg, errmsg, -1);
570                         break;
571                 default:
572                         rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
573                                                              0,
574                                                              &IO->SendBuf);
575                 }
576
577 #ifdef BIGBAD_IODBG
578                 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
579                 fclose(fd);
580         }
581 #endif
582         if (rc == 0)
583         {
584                 ev_io_stop(event_base, &IO->send_event);
585                 switch (IO->NextState) {
586                 case eSendMore:
587                         assert(IO->SendDone);
588                         IO->NextState = IO->SendDone(IO);
589
590                         if ((IO->NextState == eTerminateConnection) ||
591                             (IO->NextState == eAbort) )
592                                 ShutDownCLient(IO);
593                         else {
594                                 ev_io_start(event_base, &IO->send_event);
595                         }
596                         break;
597                 case eSendFile:
598                         if (IO->IOB.ChunkSendRemain > 0) {
599                                 ev_io_start(event_base, &IO->recv_event);
600                                 SetNextTimeout(IO, 100.0);
601
602                         } else {
603                                 assert(IO->ReadDone);
604                                 IO->NextState = IO->ReadDone(IO);
605                                 switch(IO->NextState) {
606                                 case eSendDNSQuery:
607                                 case eReadDNSReply:
608                                 case eDBQuery:
609                                 case eConnect:
610                                         break;
611                                 case eSendReply:
612                                 case eSendMore:
613                                 case eSendFile:
614                                         ev_io_start(event_base,
615                                                     &IO->send_event);
616                                         break;
617                                 case eReadMessage:
618                                 case eReadMore:
619                                 case eReadPayload:
620                                 case eReadFile:
621                                         break;
622                                 case eTerminateConnection:
623                                 case eAbort:
624                                         break;
625                                 }
626                         }
627                         break;
628                 case eSendReply:
629                     if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
630                         break;
631                     IO->NextState = eReadMore;
632                 case eReadMore:
633                 case eReadMessage:
634                 case eReadPayload:
635                 case eReadFile:
636                         if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
637                         {
638                                 HandleInbound(IO);
639                         }
640                         else {
641                                 ev_io_start(event_base, &IO->recv_event);
642                         }
643
644                         break;
645                 case eDBQuery:
646                         /*
647                          * we now live in another queue,
648                          * so we have to unregister.
649                          */
650                         ev_cleanup_stop(loop, &IO->abort_by_shutdown);
651                         break;
652                 case eSendDNSQuery:
653                 case eReadDNSReply:
654                 case eConnect:
655                 case eTerminateConnection:
656                 case eAbort:
657                         break;
658                 }
659         }
660         else if (rc < 0) {
661                 if (errno != EAGAIN) {
662                         StopClientWatchers(IO, 1);
663                         EV_syslog(LOG_DEBUG,
664                                   "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
665                                   errno, strerror(errno), IO->SendBuf.fd);
666                         StrBufPrintf(IO->ErrMsg,
667                                      "Socket Invalid! [%s]",
668                                      strerror(errno));
669                         SetNextTimeout(IO, 0.01);
670                 }
671         }
672         /* else : must write more. */
673 }
674 static void
675 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
676 {
677         ev_timer_stop(event_base, &IO->conn_fail);
678         ev_timer_start(event_base, &IO->rw_timeout);
679
680         switch(IO->NextState) {
681         case eReadMore:
682         case eReadMessage:
683         case eReadFile:
684                 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
685                 ev_io_start(event_base, &IO->recv_event);
686                 break;
687         case eSendReply:
688         case eSendMore:
689         case eReadPayload:
690         case eSendFile:
691                 become_session(IO->CitContext);
692                 IO_send_callback(loop, &IO->send_event, revents);
693                 break;
694         case eDBQuery:
695         case eSendDNSQuery:
696         case eReadDNSReply:
697         case eConnect:
698         case eTerminateConnection:
699         case eAbort:
700                 /// TODO: WHUT?
701                 break;
702         }
703 }
704
705 static void
706 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
707 {
708         AsyncIO *IO = watcher->data;
709
710         SetEVState(IO, eIOTimeout);
711         IO->Now = ev_now(event_base);
712         ev_timer_stop (event_base, &IO->rw_timeout);
713         become_session(IO->CitContext);
714
715         if (IO->SendBuf.fd != 0)
716         {
717                 ev_io_stop(event_base, &IO->send_event);
718                 ev_io_stop(event_base, &IO->recv_event);
719                 ev_timer_stop (event_base, &IO->rw_timeout);
720                 close(IO->SendBuf.fd);
721                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
722         }
723
724         assert(IO->Timeout);
725         switch (IO->Timeout(IO))
726         {
727         case eAbort:
728                 ShutDownCLient(IO);
729         default:
730                 break;
731         }
732 }
733
734 static void
735 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
736 {
737         AsyncIO *IO = watcher->data;
738
739         SetEVState(IO, eIOConnfail);
740         IO->Now = ev_now(event_base);
741         ev_timer_stop (event_base, &IO->conn_fail);
742
743         if (IO->SendBuf.fd != 0)
744         {
745                 ev_io_stop(loop, &IO->conn_event);
746                 ev_io_stop(event_base, &IO->send_event);
747                 ev_io_stop(event_base, &IO->recv_event);
748                 ev_timer_stop (event_base, &IO->rw_timeout);
749                 close(IO->SendBuf.fd);
750                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
751         }
752         become_session(IO->CitContext);
753
754         assert(IO->ConnFail);
755         switch (IO->ConnFail(IO))
756         {
757         case eAbort:
758                 ShutDownCLient(IO);
759         default:
760                 break;
761
762         }
763 }
764
765 static void
766 IO_connfailimmediate_callback(struct ev_loop *loop,
767                               ev_idle *watcher,
768                               int revents)
769 {
770         AsyncIO *IO = watcher->data;
771
772         SetEVState(IO, eIOConnfailNow);
773         IO->Now = ev_now(event_base);
774         ev_idle_stop (event_base, &IO->conn_fail_immediate);
775
776         if (IO->SendBuf.fd != 0)
777         {
778                 close(IO->SendBuf.fd);
779                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
780         }
781         become_session(IO->CitContext);
782
783         assert(IO->ConnFail);
784         switch (IO->ConnFail(IO))
785         {
786         case eAbort:
787                 ShutDownCLient(IO);
788         default:
789                 break;
790
791         }
792 }
793
794 static void
795 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
796 {
797         AsyncIO *IO = watcher->data;
798         int             so_err = 0;
799         socklen_t       lon = sizeof(so_err);
800         int             err;
801
802         SetEVState(IO, eIOConnNow);
803         IO->Now = ev_now(event_base);
804         EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
805
806         ev_io_stop(loop, &IO->conn_event);
807         ev_timer_stop(event_base, &IO->conn_fail);
808
809         err = getsockopt(IO->SendBuf.fd,
810                          SOL_SOCKET,
811                          SO_ERROR,
812                          (void*)&so_err,
813                          &lon);
814
815         if ((err == 0) && (so_err != 0))
816         {
817                 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
818                           so_err,
819                           strerror(so_err));
820                 IO_connfail_callback(loop, &IO->conn_fail, revents);
821
822         }
823         else
824         {
825                 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
826                 set_start_callback(loop, IO, revents);
827         }
828 }
829
830 static void
831 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
832 {
833         const char *errmsg;
834         ssize_t nbytes;
835         AsyncIO *IO = watcher->data;
836
837         IO->Now = ev_now(event_base);
838         switch (IO->NextState) {
839         case eReadFile:
840                 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
841                 if (nbytes < 0)
842                         StrBufPlain(IO->ErrMsg, errmsg, -1);
843                 else
844                 {
845                         if (IO->IOB.ChunkSendRemain == 0)
846                         {
847                                 IO->NextState = eSendReply;
848                                 assert(IO->ReadDone);
849                                 ev_io_stop(event_base, &IO->recv_event);
850                                 PostInbound(IO);
851                                 return;
852                         }
853                         else
854                                 return;
855                 }
856                 break;
857         default:
858                 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
859                                                         0,
860                                                         &IO->RecvBuf);
861                 break;
862         }
863
864 #ifdef BIGBAD_IODBG
865         {
866                 long nbytes;
867                 int rv = 0;
868                 char fn [SIZ];
869                 FILE *fd;
870                 const char *pch = ChrPtr(IO->RecvBuf.Buf);
871                 const char *pchh = IO->RecvBuf.ReadWritePointer;
872
873                 if (pchh == NULL)
874                         pchh = pch;
875
876                 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
877                 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
878                          ((CitContext*)(IO->CitContext))->ServiceName,
879                          IO->SendBuf.fd);
880
881                 fd = fopen(fn, "a+");
882                 fprintf(fd, "Read: BufSize: %ld BufContent: [",
883                         nbytes);
884                 rv = fwrite(pchh, nbytes, 1, fd);
885                 if (!rv) printf("failed to write debug to %s!\n", fn);
886                 fprintf(fd, "]\n");
887                 fclose(fd);
888         }
889 #endif
890         if (nbytes > 0) {
891                 HandleInbound(IO);
892         } else if (nbytes == 0) {
893                 StopClientWatchers(IO, 1);
894                 SetNextTimeout(IO, 0.01);
895                 return;
896         } else if (nbytes == -1) {
897                 if (errno != EAGAIN) {
898                         // FD is gone. kick it. 
899                         StopClientWatchers(IO, 1);
900                         EV_syslog(LOG_DEBUG,
901                                   "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
902                                   errno, strerror(errno), IO->SendBuf.fd);
903                         StrBufPrintf(IO->ErrMsg,
904                                      "Socket Invalid! [%s]",
905                                      strerror(errno));
906                         SetNextTimeout(IO, 0.01);
907                 }
908                 return;
909         }
910 }
911
912 void
913 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
914 {
915         AsyncIO *IO = watcher->data;
916
917         SetEVState(IO, eCaresFinished);
918         IO->Now = ev_now(event_base);
919         EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
920         become_session(IO->CitContext);
921         assert(IO->DNS.Query->PostDNS);
922         switch (IO->DNS.Query->PostDNS(IO))
923         {
924         case eAbort:
925                 assert(IO->DNS.Fail);
926                 switch (IO->DNS.Fail(IO)) {
927                 case eAbort:
928 ////                    StopClientWatchers(IO);
929                         ShutDownCLient(IO);
930                 default:
931                         break;
932                 }
933         default:
934                 break;
935         }
936 }
937
938
939 eNextState EvConnectSock(AsyncIO *IO,
940                          double conn_timeout,
941                          double first_rw_timeout,
942                          int ReadFirst)
943 {
944         struct sockaddr_in egress_sin;
945         int fdflags;
946         int rc = -1;
947
948         SetEVState(IO, eIOConnectSock);
949         become_session(IO->CitContext);
950
951         if (ReadFirst) {
952                 IO->NextState = eReadMessage;
953         }
954         else {
955                 IO->NextState = eSendReply;
956         }
957
958         IO->SendBuf.fd = IO->RecvBuf.fd =
959                 socket(
960                         (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
961                         SOCK_STREAM,
962                         IPPROTO_TCP);
963
964         if (IO->SendBuf.fd < 0) {
965                 EV_syslog(LOG_ERR,
966                           "EVENT: socket() failed: %s\n",
967                           strerror(errno));
968
969                 StrBufPrintf(IO->ErrMsg,
970                              "Failed to create socket: %s",
971                              strerror(errno));
972                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
973                 return eAbort;
974         }
975         fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
976         if (fdflags < 0) {
977                 EV_syslog(LOG_ERR,
978                           "EVENT: unable to get socket %d flags! %s \n",
979                           IO->SendBuf.fd,
980                           strerror(errno));
981                 StrBufPrintf(IO->ErrMsg,
982                              "Failed to get socket %d flags: %s",
983                              IO->SendBuf.fd,
984                              strerror(errno));
985                 close(IO->SendBuf.fd);
986                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
987                 return eAbort;
988         }
989         fdflags = fdflags | O_NONBLOCK;
990         if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
991                 EV_syslog(
992                         LOG_ERR,
993                         "EVENT: unable to set socket %d nonblocking flags! %s \n",
994                         IO->SendBuf.fd,
995                         strerror(errno));
996                 StrBufPrintf(IO->ErrMsg,
997                              "Failed to set socket flags: %s",
998                              strerror(errno));
999                 close(IO->SendBuf.fd);
1000                 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1001                 return eAbort;
1002         }
1003 /* TODO: maye we could use offsetof() to calc the position of data...
1004  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
1005  */
1006         ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
1007         IO->recv_event.data = IO;
1008         ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
1009         IO->send_event.data = IO;
1010
1011         ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1012         IO->conn_fail.data = IO;
1013         ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1014         IO->rw_timeout.data = IO;
1015
1016
1017
1018
1019         /* for debugging you may bypass it like this:
1020          * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1021          * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1022          *   inet_addr("127.0.0.1");
1023          */
1024         if (IO->ConnectMe->IPv6) {
1025                 rc = connect(IO->SendBuf.fd,
1026                              &IO->ConnectMe->Addr,
1027                              sizeof(struct sockaddr_in6));
1028         }
1029         else {
1030                 /* If citserver is bound to a specific IP address on the host, make
1031                  * sure we use that address for outbound connections.
1032                  */
1033         
1034                 memset(&egress_sin, 0, sizeof(egress_sin));
1035                 egress_sin.sin_family = AF_INET;
1036                 if (!IsEmptyStr(config.c_ip_addr)) {
1037                         egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1038                         if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1039                                 egress_sin.sin_addr.s_addr = INADDR_ANY;
1040                         }
1041
1042                         /* If this bind fails, no problem; we can still use INADDR_ANY */
1043                         bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1044                 }
1045                 rc = connect(IO->SendBuf.fd,
1046                              (struct sockaddr_in *)&IO->ConnectMe->Addr,
1047                              sizeof(struct sockaddr_in));
1048         }
1049
1050         if (rc >= 0){
1051                 SetEVState(IO, eIOConnNow);
1052                 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1053                 set_start_callback(event_base, IO, 0);
1054                 return IO->NextState;
1055         }
1056         else if (errno == EINPROGRESS) {
1057                 SetEVState(IO, eIOConnWait);
1058                 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1059
1060                 ev_io_init(&IO->conn_event,
1061                            IO_connestd_callback,
1062                            IO->SendBuf.fd,
1063                            EV_READ|EV_WRITE);
1064
1065                 IO->conn_event.data = IO;
1066
1067                 ev_io_start(event_base, &IO->conn_event);
1068                 ev_timer_start(event_base, &IO->conn_fail);
1069                 return IO->NextState;
1070         }
1071         else {
1072                 SetEVState(IO, eIOConnfail);
1073                 ev_idle_init(&IO->conn_fail_immediate,
1074                              IO_connfailimmediate_callback);
1075                 IO->conn_fail_immediate.data = IO;
1076                 ev_idle_start(event_base, &IO->conn_fail_immediate);
1077
1078                 EV_syslog(LOG_ERR,
1079                           "connect() = %d failed: %s\n",
1080                           IO->SendBuf.fd,
1081                           strerror(errno));
1082
1083                 StrBufPrintf(IO->ErrMsg,
1084                              "Failed to connect: %s",
1085                              strerror(errno));
1086                 return IO->NextState;
1087         }
1088         return IO->NextState;
1089 }
1090
1091 void SetNextTimeout(AsyncIO *IO, double timeout)
1092 {
1093         IO->rw_timeout.repeat = timeout;
1094         ev_timer_again (event_base,  &IO->rw_timeout);
1095 }
1096
1097
1098 eNextState ReAttachIO(AsyncIO *IO,
1099                       void *pData,
1100                       int ReadFirst)
1101 {
1102         SetEVState(IO, eIOAttach);
1103         IO->Data = pData;
1104         become_session(IO->CitContext);
1105         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1106         if (ReadFirst) {
1107                 IO->NextState = eReadMessage;
1108         }
1109         else {
1110                 IO->NextState = eSendReply;
1111         }
1112         set_start_callback(event_base, IO, 0);
1113
1114         return IO->NextState;
1115 }
1116
1117 void InitIOStruct(AsyncIO *IO,
1118                   void *Data,
1119                   eNextState NextState,
1120                   IO_LineReaderCallback LineReader,
1121                   IO_CallBack DNS_Fail,
1122                   IO_CallBack SendDone,
1123                   IO_CallBack ReadDone,
1124                   IO_CallBack Terminate,
1125                   IO_CallBack DBTerminate,
1126                   IO_CallBack ConnFail,
1127                   IO_CallBack Timeout,
1128                   IO_CallBack ShutdownAbort)
1129 {
1130         IO->Data          = Data;
1131
1132         IO->CitContext    = CloneContext(CC);
1133         IO->CitContext->session_specific_data = Data;
1134         IO->CitContext->IO = IO;
1135
1136         IO->NextState     = NextState;
1137
1138         IO->SendDone      = SendDone;
1139         IO->ReadDone      = ReadDone;
1140         IO->Terminate     = Terminate;
1141         IO->DBTerminate   = DBTerminate;
1142         IO->LineReader    = LineReader;
1143         IO->ConnFail      = ConnFail;
1144         IO->Timeout       = Timeout;
1145         IO->ShutdownAbort = ShutdownAbort;
1146
1147         IO->DNS.Fail      = DNS_Fail;
1148
1149         IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
1150         IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
1151         IO->IOBuf         = NewStrBuf();
1152         EV_syslog(LOG_DEBUG,
1153                   "EVENT: Session lives at %p IO at %p \n",
1154                   Data, IO);
1155
1156 }
1157
1158 extern int evcurl_init(AsyncIO *IO);
1159
1160 int InitcURLIOStruct(AsyncIO *IO,
1161                      void *Data,
1162                      const char* Desc,
1163                      IO_CallBack SendDone,
1164                      IO_CallBack Terminate,
1165                      IO_CallBack DBTerminate,
1166                      IO_CallBack ShutdownAbort)
1167 {
1168         IO->Data          = Data;
1169
1170         IO->CitContext    = CloneContext(CC);
1171         IO->CitContext->session_specific_data = Data;
1172         IO->CitContext->IO = IO;
1173
1174         IO->SendDone      = SendDone;
1175         IO->Terminate     = Terminate;
1176         IO->DBTerminate   = DBTerminate;
1177         IO->ShutdownAbort = ShutdownAbort;
1178
1179         strcpy(IO->HttpReq.errdesc, Desc);
1180
1181
1182         return  evcurl_init(IO);
1183
1184 }
1185
1186
1187 typedef struct KillOtherSessionContext {
1188         AsyncIO IO;
1189         AsyncIO *OtherOne;
1190 }KillOtherSessionContext;
1191
1192 eNextState KillTerminate(AsyncIO *IO)
1193 {
1194         long id;
1195         KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1196         EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1197         id = IO->ID;
1198         FreeAsyncIOContents(IO);
1199         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1200         IO->ID = id; /* just for the case we want to analyze it in a coredump */
1201         free(Ctx);
1202         return eAbort;
1203
1204 }
1205
1206 eNextState KillShutdown(AsyncIO *IO)
1207 {
1208         return eTerminateConnection;
1209 }
1210
1211 eNextState KillOtherContextNow(AsyncIO *IO)
1212 {
1213         KillOtherSessionContext *Ctx = IO->Data;
1214
1215         SetEVState(IO, eKill);
1216
1217         if (Ctx->OtherOne->ShutdownAbort != NULL)
1218                 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1219         return eTerminateConnection;
1220 }
1221
1222 void KillAsyncIOContext(AsyncIO *IO)
1223 {
1224         KillOtherSessionContext *Ctx;
1225
1226         Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1227         memset(Ctx, 0, sizeof(KillOtherSessionContext));
1228         
1229         InitIOStruct(&Ctx->IO,
1230                      Ctx,
1231                      eReadMessage,
1232                      NULL,
1233                      NULL,
1234                      NULL,
1235                      NULL,
1236                      KillTerminate,
1237                      NULL,
1238                      NULL,
1239                      NULL,
1240                      KillShutdown);
1241
1242         Ctx->OtherOne = IO;
1243
1244         switch(IO->NextState) {
1245         case eSendDNSQuery:
1246         case eReadDNSReply:
1247
1248         case eConnect:
1249         case eSendReply:
1250         case eSendMore:
1251         case eSendFile:
1252
1253         case eReadMessage:
1254         case eReadMore:
1255         case eReadPayload:
1256         case eReadFile:
1257                 IO->ReAttachCB = KillOtherContextNow;
1258                 QueueAnEventContext(&Ctx->IO);
1259                 break;
1260         case eDBQuery:
1261                 IO->ReAttachCB = KillOtherContextNow;
1262                 QueueAnDBOperation(&Ctx->IO);
1263                 break;
1264         case eTerminateConnection:
1265         case eAbort:
1266                 /*hm, its already dying, dunno which Queue its in... */
1267                 free(Ctx);
1268         }
1269         
1270 }
1271
1272 extern int DebugEventLoopBacktrace;
1273 void EV_backtrace(AsyncIO *IO)
1274 {
1275 #ifdef HAVE_BACKTRACE
1276         void *stack_frames[50];
1277         size_t size, i;
1278         char **strings;
1279
1280         if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1281                 return;
1282         size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1283         strings = backtrace_symbols(stack_frames, size);
1284         for (i = 0; i < size; i++) {
1285                 if (strings != NULL) {
1286                         EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1287                 }
1288                 else {
1289                         EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1290                 }
1291         }
1292         free(strings);
1293 #endif
1294 }
1295
1296
1297 ev_tstamp ctdl_ev_now (void)
1298 {
1299         return ev_now(event_base);
1300 }