2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 3.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
22 #include <sys/types.h>
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
47 #include <libcitadel.h>
50 #include "citserver.h"
57 #include "internet_addressing.h"
60 #include "clientsocket.h"
61 #include "locate_host.h"
62 #include "citadel_dirs.h"
64 #include "event_client.h"
65 #include "ctdl_module.h"
68 ConstStr IOStates[] = {
75 {HKEY("DB Terminate")},
78 {HKEY("IO Connect Socket")},
81 {HKEY("IO ConnFail")},
82 {HKEY("IO ConnFail Now")},
83 {HKEY("IO Conn Now")},
84 {HKEY("IO Conn Wait")},
87 {HKEY("Curl Shotdown")},
88 {HKEY("Curl More IO")},
89 {HKEY("Curl Got IO")},
90 {HKEY("Curl Got Data")},
91 {HKEY("Curl Got Status")},
92 {HKEY("C-Ares Start")},
93 {HKEY("C-Ares IO Done")},
94 {HKEY("C-Ares Finished")},
95 {HKEY("C-Ares exit")},
100 void SetEVState(AsyncIO *IO, eIOState State)
103 CitContext* CCC = IO->CitContext;
105 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
110 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
111 static void IO_abort_shutdown_callback(struct ev_loop *loop,
116 /*------------------------------------------------------------------------------
118 *----------------------------------------------------------------------------*/
119 extern int evdb_count;
120 extern pthread_mutex_t DBEventQueueMutex;
121 extern pthread_mutex_t DBEventExitQueueMutex;
122 extern HashList *DBInboundEventQueue;
123 extern struct ev_loop *event_db;
124 extern ev_async DBAddJob;
125 extern ev_async DBExitEventLoop;
127 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
132 SetEVState(IO, eDBQ);
133 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
136 ev_cleanup_init(&IO->db_abort_by_shutdown,
137 IO_abort_shutdown_callback);
138 IO->db_abort_by_shutdown.data = IO;
140 pthread_mutex_lock(&DBEventQueueMutex);
141 if (DBInboundEventQueue == NULL)
143 /* shutting down... */
145 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
146 pthread_mutex_unlock(&DBEventQueueMutex);
149 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
151 Put(DBInboundEventQueue, IKEY(i), h, NULL);
152 pthread_mutex_unlock(&DBEventQueueMutex);
154 pthread_mutex_lock(&DBEventExitQueueMutex);
155 if (event_db == NULL)
157 pthread_mutex_unlock(&DBEventExitQueueMutex);
160 ev_async_send (event_db, &DBAddJob);
161 pthread_mutex_unlock(&DBEventExitQueueMutex);
163 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
167 void StopDBWatchers(AsyncIO *IO)
169 SetEVState(IO, eDBStop);
170 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
171 ev_idle_stop(event_db, &IO->db_unwind_stack);
174 void ShutDownDBCLient(AsyncIO *IO)
176 CitContext *Ctx =IO->CitContext;
179 SetEVState(IO, eDBTerm);
180 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
183 assert(IO->DBTerminate);
188 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
190 AsyncIO *IO = watcher->data;
192 SetEVState(IO, eDBNext);
193 IO->Now = ev_now(event_db);
194 EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
195 become_session(IO->CitContext);
197 ev_idle_stop(event_db, &IO->db_unwind_stack);
199 assert(IO->NextDBOperation);
200 switch (IO->NextDBOperation(IO))
214 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
216 case eTerminateConnection:
218 ev_idle_stop(event_db, &IO->db_unwind_stack);
219 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
220 ShutDownDBCLient(IO);
224 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
226 SetEVState(IO, eQDBNext);
227 IO->NextDBOperation = CB;
228 ev_idle_init(&IO->db_unwind_stack,
230 IO->db_unwind_stack.data = IO;
231 ev_idle_start(event_db, &IO->db_unwind_stack);
235 /*------------------------------------------------------------------------------
237 *----------------------------------------------------------------------------*/
238 extern int evbase_count;
239 extern pthread_mutex_t EventQueueMutex;
240 extern pthread_mutex_t EventExitQueueMutex;
241 extern HashList *InboundEventQueue;
242 extern struct ev_loop *event_base;
243 extern ev_async AddJob;
244 extern ev_async ExitEventLoop;
246 static void IO_abort_shutdown_callback(struct ev_loop *loop,
250 AsyncIO *IO = watcher->data;
252 SetEVState(IO, eIOAbort);
253 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
254 IO->Now = ev_now(event_base);
255 assert(IO->ShutdownAbort);
256 IO->ShutdownAbort(IO);
260 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
265 SetEVState(IO, eIOQ);
266 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
269 ev_cleanup_init(&IO->abort_by_shutdown,
270 IO_abort_shutdown_callback);
271 IO->abort_by_shutdown.data = IO;
273 pthread_mutex_lock(&EventQueueMutex);
274 if (InboundEventQueue == NULL)
277 /* shutting down... */
278 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
279 pthread_mutex_unlock(&EventQueueMutex);
282 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
284 Put(InboundEventQueue, IKEY(i), h, NULL);
285 pthread_mutex_unlock(&EventQueueMutex);
287 pthread_mutex_lock(&EventExitQueueMutex);
288 if (event_base == NULL) {
289 pthread_mutex_unlock(&EventExitQueueMutex);
292 ev_async_send (event_base, &AddJob);
293 pthread_mutex_unlock(&EventExitQueueMutex);
294 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
298 eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
300 StopClientWatchers(IO, 0);
301 return QueueDBOperation(IO, CB);
303 eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB)
306 return QueueEventContext(IO, CB);
309 extern eNextState evcurl_handle_start(AsyncIO *IO);
311 eNextState QueueCurlContext(AsyncIO *IO)
316 SetEVState(IO, eCurlQ);
317 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
319 h->EvAttch = evcurl_handle_start;
321 pthread_mutex_lock(&EventQueueMutex);
322 if (InboundEventQueue == NULL)
324 /* shutting down... */
326 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
327 pthread_mutex_unlock(&EventQueueMutex);
331 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
333 Put(InboundEventQueue, IKEY(i), h, NULL);
334 pthread_mutex_unlock(&EventQueueMutex);
336 pthread_mutex_lock(&EventExitQueueMutex);
337 if (event_base == NULL) {
338 pthread_mutex_unlock(&EventExitQueueMutex);
341 ev_async_send (event_base, &AddJob);
342 pthread_mutex_unlock(&EventExitQueueMutex);
344 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
348 eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
350 StopCurlWatchers(IO);
351 return QueueDBOperation(IO, CB);
355 void DestructCAres(AsyncIO *IO);
356 void FreeAsyncIOContents(AsyncIO *IO)
358 CitContext *Ctx = IO->CitContext;
360 FreeStrBuf(&IO->IOBuf);
361 FreeStrBuf(&IO->SendBuf.Buf);
362 FreeStrBuf(&IO->RecvBuf.Buf);
366 FreeURL(&IO->ConnectMe);
367 FreeStrBuf(&IO->HttpReq.ReplyData);
370 Ctx->state = CON_IDLE;
372 IO->CitContext = NULL;
377 void StopClientWatchers(AsyncIO *IO, int CloseFD)
379 ev_timer_stop (event_base, &IO->rw_timeout);
380 ev_timer_stop(event_base, &IO->conn_fail);
381 ev_idle_stop(event_base, &IO->unwind_stack);
382 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
384 ev_io_stop(event_base, &IO->conn_event);
385 ev_io_stop(event_base, &IO->send_event);
386 ev_io_stop(event_base, &IO->recv_event);
388 if (CloseFD && (IO->SendBuf.fd > 0)) {
389 close(IO->SendBuf.fd);
395 void StopCurlWatchers(AsyncIO *IO)
397 EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n");
399 ev_timer_stop (event_base, &IO->rw_timeout);
400 ev_timer_stop(event_base, &IO->conn_fail);
401 ev_idle_stop(event_base, &IO->unwind_stack);
402 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
404 ev_io_stop(event_base, &IO->conn_event);
405 ev_io_stop(event_base, &IO->send_event);
406 ev_io_stop(event_base, &IO->recv_event);
408 curl_easy_cleanup(IO->HttpReq.chnd);
409 IO->HttpReq.chnd = NULL;
411 if (IO->SendBuf.fd != 0) {
412 close(IO->SendBuf.fd);
418 void ShutDownCLient(AsyncIO *IO)
420 CitContext *Ctx =IO->CitContext;
422 SetEVState(IO, eExit);
425 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
427 StopClientWatchers(IO, 1);
429 if (IO->DNS.Channel != NULL) {
430 ares_destroy(IO->DNS.Channel);
431 EV_DNS_LOG_STOP(DNS.recv_event);
432 EV_DNS_LOG_STOP(DNS.send_event);
433 ev_io_stop(event_base, &IO->DNS.recv_event);
434 ev_io_stop(event_base, &IO->DNS.send_event);
435 IO->DNS.Channel = NULL;
437 assert(IO->Terminate);
441 void PostInbound(AsyncIO *IO)
443 switch (IO->NextState) {
445 ev_io_start(event_base, &IO->send_event);
449 assert(IO->SendDone);
450 IO->NextState = IO->SendDone(IO);
451 switch (IO->NextState)
460 ev_io_start(event_base, &IO->send_event);
463 StopClientWatchers(IO, 0);
471 ev_io_start(event_base, &IO->recv_event);
473 case eTerminateConnection:
487 eReadState HandleInbound(AsyncIO *IO)
489 const char *Err = NULL;
490 eReadState Finished = eBufferNotEmpty;
492 become_session(IO->CitContext);
494 while ((Finished == eBufferNotEmpty) &&
495 ((IO->NextState == eReadMessage)||
496 (IO->NextState == eReadMore)||
497 (IO->NextState == eReadFile)||
498 (IO->NextState == eReadPayload)))
501 * lex line reply in callback,
502 * or do it ourselves.
503 * i.e. as nnn-blabla means continue reading in SMTP
505 if ((IO->NextState == eReadFile) &&
506 (Finished == eBufferNotEmpty))
508 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
509 if (Finished == eReadSuccess)
511 IO->NextState = eSendReply;
514 else if (IO->LineReader)
515 Finished = IO->LineReader(IO);
517 Finished = StrBufChunkSipLine(IO->IOBuf,
521 case eMustReadMore: /// read new from socket...
523 case eBufferNotEmpty: /* shouldn't happen... */
524 case eReadSuccess: /// done for now...
526 case eReadFail: /// WHUT?
531 if (Finished != eMustReadMore) {
532 assert(IO->ReadDone);
533 ev_io_stop(event_base, &IO->recv_event);
534 IO->NextState = IO->ReadDone(IO);
535 Finished = StrBufCheckBuffer(&IO->RecvBuf);
546 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
549 AsyncIO *IO = watcher->data;
550 const char *errmsg = NULL;
552 IO->Now = ev_now(event_base);
553 become_session(IO->CitContext);
559 const char *pch = ChrPtr(IO->SendBuf.Buf);
560 const char *pchh = IO->SendBuf.ReadWritePointer;
566 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
567 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
568 ((CitContext*)(IO->CitContext))->ServiceName,
571 fd = fopen(fn, "a+");
572 fprintf(fd, "Send: BufSize: %ld BufContent: [",
574 rv = fwrite(pchh, nbytes, 1, fd);
575 if (!rv) printf("failed to write debug to %s!\n", fn);
578 switch (IO->NextState) {
580 rc = FileSendChunked(&IO->IOB, &errmsg);
582 StrBufPlain(IO->ErrMsg, errmsg, -1);
585 rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
591 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
597 ev_io_stop(event_base, &IO->send_event);
598 switch (IO->NextState) {
600 assert(IO->SendDone);
601 IO->NextState = IO->SendDone(IO);
603 if ((IO->NextState == eTerminateConnection) ||
604 (IO->NextState == eAbort) )
607 ev_io_start(event_base, &IO->send_event);
611 if (IO->IOB.ChunkSendRemain > 0) {
612 ev_io_start(event_base, &IO->recv_event);
613 SetNextTimeout(IO, 100.0);
616 assert(IO->ReadDone);
617 IO->NextState = IO->ReadDone(IO);
618 switch(IO->NextState) {
627 ev_io_start(event_base,
635 case eTerminateConnection:
642 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
644 IO->NextState = eReadMore;
649 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
654 ev_io_start(event_base, &IO->recv_event);
660 * we now live in another queue,
661 * so we have to unregister.
663 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
668 case eTerminateConnection:
674 if (errno != EAGAIN) {
675 StopClientWatchers(IO, 1);
677 "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
678 errno, strerror(errno), IO->SendBuf.fd);
679 StrBufPrintf(IO->ErrMsg,
680 "Socket Invalid! [%s]",
682 SetNextTimeout(IO, 0.01);
685 /* else : must write more. */
688 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
690 ev_timer_stop(event_base, &IO->conn_fail);
691 ev_timer_start(event_base, &IO->rw_timeout);
693 switch(IO->NextState) {
697 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
698 ev_io_start(event_base, &IO->recv_event);
704 become_session(IO->CitContext);
705 IO_send_callback(loop, &IO->send_event, revents);
711 case eTerminateConnection:
719 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
721 AsyncIO *IO = watcher->data;
723 SetEVState(IO, eIOTimeout);
724 IO->Now = ev_now(event_base);
725 ev_timer_stop (event_base, &IO->rw_timeout);
726 become_session(IO->CitContext);
728 if (IO->SendBuf.fd != 0)
730 ev_io_stop(event_base, &IO->send_event);
731 ev_io_stop(event_base, &IO->recv_event);
732 ev_timer_stop (event_base, &IO->rw_timeout);
733 close(IO->SendBuf.fd);
734 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
738 switch (IO->Timeout(IO))
748 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
750 AsyncIO *IO = watcher->data;
752 SetEVState(IO, eIOConnfail);
753 IO->Now = ev_now(event_base);
754 ev_timer_stop (event_base, &IO->conn_fail);
756 if (IO->SendBuf.fd != 0)
758 ev_io_stop(loop, &IO->conn_event);
759 ev_io_stop(event_base, &IO->send_event);
760 ev_io_stop(event_base, &IO->recv_event);
761 ev_timer_stop (event_base, &IO->rw_timeout);
762 close(IO->SendBuf.fd);
763 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
765 become_session(IO->CitContext);
767 assert(IO->ConnFail);
768 switch (IO->ConnFail(IO))
779 IO_connfailimmediate_callback(struct ev_loop *loop,
783 AsyncIO *IO = watcher->data;
785 SetEVState(IO, eIOConnfailNow);
786 IO->Now = ev_now(event_base);
787 ev_idle_stop (event_base, &IO->conn_fail_immediate);
789 if (IO->SendBuf.fd != 0)
791 close(IO->SendBuf.fd);
792 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
794 become_session(IO->CitContext);
796 assert(IO->ConnFail);
797 switch (IO->ConnFail(IO))
808 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
810 AsyncIO *IO = watcher->data;
812 socklen_t lon = sizeof(so_err);
815 SetEVState(IO, eIOConnNow);
816 IO->Now = ev_now(event_base);
817 EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
819 ev_io_stop(loop, &IO->conn_event);
820 ev_timer_stop(event_base, &IO->conn_fail);
822 err = getsockopt(IO->SendBuf.fd,
828 if ((err == 0) && (so_err != 0))
830 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
833 IO_connfail_callback(loop, &IO->conn_fail, revents);
838 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
839 set_start_callback(loop, IO, revents);
844 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
848 AsyncIO *IO = watcher->data;
850 IO->Now = ev_now(event_base);
851 switch (IO->NextState) {
853 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
855 StrBufPlain(IO->ErrMsg, errmsg, -1);
858 if (IO->IOB.ChunkSendRemain == 0)
860 IO->NextState = eSendReply;
861 assert(IO->ReadDone);
862 ev_io_stop(event_base, &IO->recv_event);
871 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
883 const char *pch = ChrPtr(IO->RecvBuf.Buf);
884 const char *pchh = IO->RecvBuf.ReadWritePointer;
889 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
890 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
891 ((CitContext*)(IO->CitContext))->ServiceName,
894 fd = fopen(fn, "a+");
895 fprintf(fd, "Read: BufSize: %ld BufContent: [",
897 rv = fwrite(pchh, nbytes, 1, fd);
898 if (!rv) printf("failed to write debug to %s!\n", fn);
905 } else if (nbytes == 0) {
906 StopClientWatchers(IO, 1);
907 SetNextTimeout(IO, 0.01);
909 } else if (nbytes == -1) {
910 if (errno != EAGAIN) {
911 // FD is gone. kick it.
912 StopClientWatchers(IO, 1);
914 "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
915 errno, strerror(errno), IO->SendBuf.fd);
916 StrBufPrintf(IO->ErrMsg,
917 "Socket Invalid! [%s]",
919 SetNextTimeout(IO, 0.01);
926 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
928 AsyncIO *IO = watcher->data;
930 SetEVState(IO, eCaresFinished);
931 IO->Now = ev_now(event_base);
932 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
933 become_session(IO->CitContext);
934 assert(IO->DNS.Query->PostDNS);
935 switch (IO->DNS.Query->PostDNS(IO))
938 assert(IO->DNS.Fail);
939 switch (IO->DNS.Fail(IO)) {
941 //// StopClientWatchers(IO);
952 eNextState EvConnectSock(AsyncIO *IO,
954 double first_rw_timeout,
957 struct sockaddr_in egress_sin;
961 SetEVState(IO, eIOConnectSock);
962 become_session(IO->CitContext);
965 IO->NextState = eReadMessage;
968 IO->NextState = eSendReply;
971 IO->SendBuf.fd = IO->RecvBuf.fd =
973 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
977 if (IO->SendBuf.fd < 0) {
979 "EVENT: socket() failed: %s\n",
982 StrBufPrintf(IO->ErrMsg,
983 "Failed to create socket: %s",
985 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
988 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
991 "EVENT: unable to get socket %d flags! %s \n",
994 StrBufPrintf(IO->ErrMsg,
995 "Failed to get socket %d flags: %s",
998 close(IO->SendBuf.fd);
999 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1002 fdflags = fdflags | O_NONBLOCK;
1003 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
1006 "EVENT: unable to set socket %d nonblocking flags! %s \n",
1009 StrBufPrintf(IO->ErrMsg,
1010 "Failed to set socket flags: %s",
1012 close(IO->SendBuf.fd);
1013 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
1016 /* TODO: maye we could use offsetof() to calc the position of data...
1017 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
1019 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
1020 IO->recv_event.data = IO;
1021 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
1022 IO->send_event.data = IO;
1024 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1025 IO->conn_fail.data = IO;
1026 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1027 IO->rw_timeout.data = IO;
1032 /* for debugging you may bypass it like this:
1033 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1034 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1035 * inet_addr("127.0.0.1");
1037 if (IO->ConnectMe->IPv6) {
1038 rc = connect(IO->SendBuf.fd,
1039 &IO->ConnectMe->Addr,
1040 sizeof(struct sockaddr_in6));
1043 /* If citserver is bound to a specific IP address on the host, make
1044 * sure we use that address for outbound connections.
1047 memset(&egress_sin, 0, sizeof(egress_sin));
1048 egress_sin.sin_family = AF_INET;
1049 if (!IsEmptyStr(config.c_ip_addr)) {
1050 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1051 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1052 egress_sin.sin_addr.s_addr = INADDR_ANY;
1055 /* If this bind fails, no problem; we can still use INADDR_ANY */
1056 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1058 rc = connect(IO->SendBuf.fd,
1059 (struct sockaddr_in *)&IO->ConnectMe->Addr,
1060 sizeof(struct sockaddr_in));
1064 SetEVState(IO, eIOConnNow);
1065 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1066 set_start_callback(event_base, IO, 0);
1067 return IO->NextState;
1069 else if (errno == EINPROGRESS) {
1070 SetEVState(IO, eIOConnWait);
1071 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1073 ev_io_init(&IO->conn_event,
1074 IO_connestd_callback,
1078 IO->conn_event.data = IO;
1080 ev_io_start(event_base, &IO->conn_event);
1081 ev_timer_start(event_base, &IO->conn_fail);
1082 return IO->NextState;
1085 SetEVState(IO, eIOConnfail);
1086 ev_idle_init(&IO->conn_fail_immediate,
1087 IO_connfailimmediate_callback);
1088 IO->conn_fail_immediate.data = IO;
1089 ev_idle_start(event_base, &IO->conn_fail_immediate);
1092 "connect() = %d failed: %s\n",
1096 StrBufPrintf(IO->ErrMsg,
1097 "Failed to connect: %s",
1099 return IO->NextState;
1101 return IO->NextState;
1104 void SetNextTimeout(AsyncIO *IO, double timeout)
1106 IO->rw_timeout.repeat = timeout;
1107 ev_timer_again (event_base, &IO->rw_timeout);
1111 eNextState ReAttachIO(AsyncIO *IO,
1115 SetEVState(IO, eIOAttach);
1117 become_session(IO->CitContext);
1118 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1120 IO->NextState = eReadMessage;
1123 IO->NextState = eSendReply;
1125 set_start_callback(event_base, IO, 0);
1127 return IO->NextState;
1130 void InitIOStruct(AsyncIO *IO,
1132 eNextState NextState,
1133 IO_LineReaderCallback LineReader,
1134 IO_CallBack DNS_Fail,
1135 IO_CallBack SendDone,
1136 IO_CallBack ReadDone,
1137 IO_CallBack Terminate,
1138 IO_CallBack DBTerminate,
1139 IO_CallBack ConnFail,
1140 IO_CallBack Timeout,
1141 IO_CallBack ShutdownAbort)
1145 IO->CitContext = CloneContext(CC);
1146 IO->CitContext->session_specific_data = Data;
1147 IO->CitContext->IO = IO;
1149 IO->NextState = NextState;
1151 IO->SendDone = SendDone;
1152 IO->ReadDone = ReadDone;
1153 IO->Terminate = Terminate;
1154 IO->DBTerminate = DBTerminate;
1155 IO->LineReader = LineReader;
1156 IO->ConnFail = ConnFail;
1157 IO->Timeout = Timeout;
1158 IO->ShutdownAbort = ShutdownAbort;
1160 IO->DNS.Fail = DNS_Fail;
1162 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
1163 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
1164 IO->IOBuf = NewStrBuf();
1165 EV_syslog(LOG_DEBUG,
1166 "EVENT: Session lives at %p IO at %p \n",
1171 extern int evcurl_init(AsyncIO *IO);
1173 int InitcURLIOStruct(AsyncIO *IO,
1176 IO_CallBack SendDone,
1177 IO_CallBack Terminate,
1178 IO_CallBack DBTerminate,
1179 IO_CallBack ShutdownAbort)
1183 IO->CitContext = CloneContext(CC);
1184 IO->CitContext->session_specific_data = Data;
1185 IO->CitContext->IO = IO;
1187 IO->SendDone = SendDone;
1188 IO->Terminate = Terminate;
1189 IO->DBTerminate = DBTerminate;
1190 IO->ShutdownAbort = ShutdownAbort;
1192 strcpy(IO->HttpReq.errdesc, Desc);
1195 return evcurl_init(IO);
1200 typedef struct KillOtherSessionContext {
1203 }KillOtherSessionContext;
1205 eNextState KillTerminate(AsyncIO *IO)
1208 KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1209 EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1211 FreeAsyncIOContents(IO);
1212 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1213 IO->ID = id; /* just for the case we want to analyze it in a coredump */
1219 eNextState KillShutdown(AsyncIO *IO)
1221 return eTerminateConnection;
1224 eNextState KillOtherContextNow(AsyncIO *IO)
1226 KillOtherSessionContext *Ctx = IO->Data;
1228 SetEVState(IO, eKill);
1230 if (Ctx->OtherOne->ShutdownAbort != NULL)
1231 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1232 return eTerminateConnection;
1235 void KillAsyncIOContext(AsyncIO *IO)
1237 KillOtherSessionContext *Ctx;
1239 Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1240 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1242 InitIOStruct(&Ctx->IO,
1257 switch(IO->NextState) {
1270 QueueEventContext(&Ctx->IO, KillOtherContextNow);
1273 QueueDBOperation(&Ctx->IO, KillOtherContextNow);
1275 case eTerminateConnection:
1277 /*hm, its already dying, dunno which Queue its in... */
1283 extern int DebugEventLoopBacktrace;
1284 void EV_backtrace(AsyncIO *IO)
1286 #ifdef HAVE_BACKTRACE
1287 void *stack_frames[50];
1291 if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1293 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1294 strings = backtrace_symbols(stack_frames, size);
1295 for (i = 0; i < size; i++) {
1296 if (strings != NULL) {
1297 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1300 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1308 ev_tstamp ctdl_ev_now (void)
1310 return ev_now(event_base);