2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 3.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
22 #include <sys/types.h>
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
47 #include <libcitadel.h>
50 #include "citserver.h"
57 #include "internet_addressing.h"
60 #include "clientsocket.h"
61 #include "locate_host.h"
62 #include "citadel_dirs.h"
64 #include "event_client.h"
65 #include "ctdl_module.h"
68 ConstStr IOStates[] = {
75 {HKEY("DB Terminate")},
78 {HKEY("IO Connect Socket")},
81 {HKEY("IO ConnFail")},
82 {HKEY("IO ConnFail Now")},
83 {HKEY("IO Conn Now")},
84 {HKEY("IO Conn Wait")},
87 {HKEY("Curl Shotdown")},
88 {HKEY("Curl More IO")},
89 {HKEY("Curl Got IO")},
90 {HKEY("Curl Got Data")},
91 {HKEY("Curl Got Status")},
92 {HKEY("C-Ares Start")},
93 {HKEY("C-Ares IO Done")},
94 {HKEY("C-Ares Finished")},
95 {HKEY("C-Ares exit")},
100 void SetEVState(AsyncIO *IO, eIOState State)
103 CitContext* CCC = IO->CitContext;
105 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
110 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
111 static void IO_abort_shutdown_callback(struct ev_loop *loop,
116 /*------------------------------------------------------------------------------
118 *----------------------------------------------------------------------------*/
119 extern int evdb_count;
120 extern pthread_mutex_t DBEventQueueMutex;
121 extern pthread_mutex_t DBEventExitQueueMutex;
122 extern HashList *DBInboundEventQueue;
123 extern struct ev_loop *event_db;
124 extern ev_async DBAddJob;
125 extern ev_async DBExitEventLoop;
127 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
132 SetEVState(IO, eDBQ);
133 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
136 ev_cleanup_init(&IO->db_abort_by_shutdown,
137 IO_abort_shutdown_callback);
138 IO->db_abort_by_shutdown.data = IO;
140 pthread_mutex_lock(&DBEventQueueMutex);
141 if (DBInboundEventQueue == NULL)
143 /* shutting down... */
145 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
146 pthread_mutex_unlock(&DBEventQueueMutex);
149 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
151 Put(DBInboundEventQueue, IKEY(i), h, NULL);
152 pthread_mutex_unlock(&DBEventQueueMutex);
154 pthread_mutex_lock(&DBEventExitQueueMutex);
155 if (event_db == NULL)
157 pthread_mutex_unlock(&DBEventExitQueueMutex);
160 ev_async_send (event_db, &DBAddJob);
161 pthread_mutex_unlock(&DBEventExitQueueMutex);
163 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
167 void StopDBWatchers(AsyncIO *IO)
169 SetEVState(IO, eDBStop);
170 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
171 ev_idle_stop(event_db, &IO->db_unwind_stack);
174 void ShutDownDBCLient(AsyncIO *IO)
176 CitContext *Ctx =IO->CitContext;
179 SetEVState(IO, eDBTerm);
180 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
183 assert(IO->DBTerminate);
188 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
190 AsyncIO *IO = watcher->data;
192 SetEVState(IO, eDBNext);
193 IO->Now = ev_now(event_db);
194 EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
195 become_session(IO->CitContext);
197 ev_idle_stop(event_db, &IO->db_unwind_stack);
199 assert(IO->NextDBOperation);
200 switch (IO->NextDBOperation(IO))
214 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
216 case eTerminateConnection:
218 ev_idle_stop(event_db, &IO->db_unwind_stack);
219 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
220 ShutDownDBCLient(IO);
224 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
226 SetEVState(IO, eQDBNext);
227 IO->NextDBOperation = CB;
228 ev_idle_init(&IO->db_unwind_stack,
230 IO->db_unwind_stack.data = IO;
231 ev_idle_start(event_db, &IO->db_unwind_stack);
235 /*------------------------------------------------------------------------------
237 *----------------------------------------------------------------------------*/
238 extern int evbase_count;
239 extern pthread_mutex_t EventQueueMutex;
240 extern pthread_mutex_t EventExitQueueMutex;
241 extern HashList *InboundEventQueue;
242 extern struct ev_loop *event_base;
243 extern ev_async AddJob;
244 extern ev_async ExitEventLoop;
246 static void IO_abort_shutdown_callback(struct ev_loop *loop,
250 AsyncIO *IO = watcher->data;
252 SetEVState(IO, eIOAbort);
253 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
254 IO->Now = ev_now(event_base);
255 assert(IO->ShutdownAbort);
256 IO->ShutdownAbort(IO);
260 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
265 SetEVState(IO, eIOQ);
266 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
269 ev_cleanup_init(&IO->abort_by_shutdown,
270 IO_abort_shutdown_callback);
271 IO->abort_by_shutdown.data = IO;
273 pthread_mutex_lock(&EventQueueMutex);
274 if (InboundEventQueue == NULL)
277 /* shutting down... */
278 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
279 pthread_mutex_unlock(&EventQueueMutex);
282 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
284 Put(InboundEventQueue, IKEY(i), h, NULL);
285 pthread_mutex_unlock(&EventQueueMutex);
287 pthread_mutex_lock(&EventExitQueueMutex);
288 if (event_base == NULL) {
289 pthread_mutex_unlock(&EventExitQueueMutex);
292 ev_async_send (event_base, &AddJob);
293 pthread_mutex_unlock(&EventExitQueueMutex);
294 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
298 extern eNextState evcurl_handle_start(AsyncIO *IO);
300 eNextState QueueCurlContext(AsyncIO *IO)
305 SetEVState(IO, eCurlQ);
306 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
308 h->EvAttch = evcurl_handle_start;
310 pthread_mutex_lock(&EventQueueMutex);
311 if (InboundEventQueue == NULL)
313 /* shutting down... */
315 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
316 pthread_mutex_unlock(&EventQueueMutex);
320 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
322 Put(InboundEventQueue, IKEY(i), h, NULL);
323 pthread_mutex_unlock(&EventQueueMutex);
325 pthread_mutex_lock(&EventExitQueueMutex);
326 if (event_base == NULL) {
327 pthread_mutex_unlock(&EventExitQueueMutex);
330 ev_async_send (event_base, &AddJob);
331 pthread_mutex_unlock(&EventExitQueueMutex);
333 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
337 void DestructCAres(AsyncIO *IO);
338 void FreeAsyncIOContents(AsyncIO *IO)
340 CitContext *Ctx = IO->CitContext;
342 FreeStrBuf(&IO->IOBuf);
343 FreeStrBuf(&IO->SendBuf.Buf);
344 FreeStrBuf(&IO->RecvBuf.Buf);
348 FreeURL(&IO->ConnectMe);
349 FreeStrBuf(&IO->HttpReq.ReplyData);
352 Ctx->state = CON_IDLE;
354 IO->CitContext = NULL;
359 void StopClientWatchers(AsyncIO *IO, int CloseFD)
361 ev_timer_stop (event_base, &IO->rw_timeout);
362 ev_timer_stop(event_base, &IO->conn_fail);
363 ev_idle_stop(event_base, &IO->unwind_stack);
364 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
366 ev_io_stop(event_base, &IO->conn_event);
367 ev_io_stop(event_base, &IO->send_event);
368 ev_io_stop(event_base, &IO->recv_event);
370 if (CloseFD && (IO->SendBuf.fd > 0)) {
371 close(IO->SendBuf.fd);
377 void StopCurlWatchers(AsyncIO *IO)
379 ev_timer_stop (event_base, &IO->rw_timeout);
380 ev_timer_stop(event_base, &IO->conn_fail);
381 ev_idle_stop(event_base, &IO->unwind_stack);
382 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
384 ev_io_stop(event_base, &IO->conn_event);
385 ev_io_stop(event_base, &IO->send_event);
386 ev_io_stop(event_base, &IO->recv_event);
388 if (IO->SendBuf.fd != 0) {
389 close(IO->SendBuf.fd);
395 void ShutDownCLient(AsyncIO *IO)
397 CitContext *Ctx =IO->CitContext;
399 SetEVState(IO, eExit);
402 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
404 StopClientWatchers(IO, 1);
406 if (IO->DNS.Channel != NULL) {
407 ares_destroy(IO->DNS.Channel);
408 EV_DNS_LOG_STOP(DNS.recv_event);
409 EV_DNS_LOG_STOP(DNS.send_event);
410 ev_io_stop(event_base, &IO->DNS.recv_event);
411 ev_io_stop(event_base, &IO->DNS.send_event);
412 IO->DNS.Channel = NULL;
414 assert(IO->Terminate);
418 void PostInbound(AsyncIO *IO)
420 switch (IO->NextState) {
422 ev_io_start(event_base, &IO->send_event);
426 assert(IO->SendDone);
427 IO->NextState = IO->SendDone(IO);
428 switch (IO->NextState)
437 ev_io_start(event_base, &IO->send_event);
440 StopClientWatchers(IO, 0);
448 ev_io_start(event_base, &IO->recv_event);
450 case eTerminateConnection:
464 eReadState HandleInbound(AsyncIO *IO)
466 const char *Err = NULL;
467 eReadState Finished = eBufferNotEmpty;
469 become_session(IO->CitContext);
471 while ((Finished == eBufferNotEmpty) &&
472 ((IO->NextState == eReadMessage)||
473 (IO->NextState == eReadMore)||
474 (IO->NextState == eReadFile)||
475 (IO->NextState == eReadPayload)))
478 * lex line reply in callback,
479 * or do it ourselves.
480 * i.e. as nnn-blabla means continue reading in SMTP
482 if ((IO->NextState == eReadFile) &&
483 (Finished == eBufferNotEmpty))
485 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
486 if (Finished == eReadSuccess)
488 IO->NextState = eSendReply;
491 else if (IO->LineReader)
492 Finished = IO->LineReader(IO);
494 Finished = StrBufChunkSipLine(IO->IOBuf,
498 case eMustReadMore: /// read new from socket...
500 case eBufferNotEmpty: /* shouldn't happen... */
501 case eReadSuccess: /// done for now...
503 case eReadFail: /// WHUT?
508 if (Finished != eMustReadMore) {
509 assert(IO->ReadDone);
510 ev_io_stop(event_base, &IO->recv_event);
511 IO->NextState = IO->ReadDone(IO);
512 Finished = StrBufCheckBuffer(&IO->RecvBuf);
523 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
526 AsyncIO *IO = watcher->data;
527 const char *errmsg = NULL;
529 IO->Now = ev_now(event_base);
530 become_session(IO->CitContext);
536 const char *pch = ChrPtr(IO->SendBuf.Buf);
537 const char *pchh = IO->SendBuf.ReadWritePointer;
543 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
544 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
545 ((CitContext*)(IO->CitContext))->ServiceName,
548 fd = fopen(fn, "a+");
549 fprintf(fd, "Send: BufSize: %ld BufContent: [",
551 rv = fwrite(pchh, nbytes, 1, fd);
552 if (!rv) printf("failed to write debug to %s!\n", fn);
555 switch (IO->NextState) {
557 rc = FileSendChunked(&IO->IOB, &errmsg);
559 StrBufPlain(IO->ErrMsg, errmsg, -1);
562 rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
568 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
574 ev_io_stop(event_base, &IO->send_event);
575 switch (IO->NextState) {
577 assert(IO->SendDone);
578 IO->NextState = IO->SendDone(IO);
580 if ((IO->NextState == eTerminateConnection) ||
581 (IO->NextState == eAbort) )
584 ev_io_start(event_base, &IO->send_event);
588 if (IO->IOB.ChunkSendRemain > 0) {
589 ev_io_start(event_base, &IO->recv_event);
590 SetNextTimeout(IO, 100.0);
593 assert(IO->ReadDone);
594 IO->NextState = IO->ReadDone(IO);
595 switch(IO->NextState) {
604 ev_io_start(event_base,
612 case eTerminateConnection:
619 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
621 IO->NextState = eReadMore;
626 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
631 ev_io_start(event_base, &IO->recv_event);
637 * we now live in another queue,
638 * so we have to unregister.
640 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
645 case eTerminateConnection:
651 if (errno != EAGAIN) {
652 StopClientWatchers(IO, 1);
654 "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
655 errno, strerror(errno), IO->SendBuf.fd);
656 StrBufPrintf(IO->ErrMsg,
657 "Socket Invalid! [%s]",
659 SetNextTimeout(IO, 0.01);
662 /* else : must write more. */
665 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
667 ev_timer_stop(event_base, &IO->conn_fail);
668 ev_timer_start(event_base, &IO->rw_timeout);
670 switch(IO->NextState) {
674 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
675 ev_io_start(event_base, &IO->recv_event);
681 become_session(IO->CitContext);
682 IO_send_callback(loop, &IO->send_event, revents);
688 case eTerminateConnection:
696 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
698 AsyncIO *IO = watcher->data;
700 SetEVState(IO, eIOTimeout);
701 IO->Now = ev_now(event_base);
702 ev_timer_stop (event_base, &IO->rw_timeout);
703 become_session(IO->CitContext);
705 if (IO->SendBuf.fd != 0)
707 ev_io_stop(event_base, &IO->send_event);
708 ev_io_stop(event_base, &IO->recv_event);
709 ev_timer_stop (event_base, &IO->rw_timeout);
710 close(IO->SendBuf.fd);
711 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
715 switch (IO->Timeout(IO))
725 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
727 AsyncIO *IO = watcher->data;
729 SetEVState(IO, eIOConnfail);
730 IO->Now = ev_now(event_base);
731 ev_timer_stop (event_base, &IO->conn_fail);
733 if (IO->SendBuf.fd != 0)
735 ev_io_stop(loop, &IO->conn_event);
736 ev_io_stop(event_base, &IO->send_event);
737 ev_io_stop(event_base, &IO->recv_event);
738 ev_timer_stop (event_base, &IO->rw_timeout);
739 close(IO->SendBuf.fd);
740 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
742 become_session(IO->CitContext);
744 assert(IO->ConnFail);
745 switch (IO->ConnFail(IO))
756 IO_connfailimmediate_callback(struct ev_loop *loop,
760 AsyncIO *IO = watcher->data;
762 SetEVState(IO, eIOConnfailNow);
763 IO->Now = ev_now(event_base);
764 ev_idle_stop (event_base, &IO->conn_fail_immediate);
766 if (IO->SendBuf.fd != 0)
768 close(IO->SendBuf.fd);
769 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
771 become_session(IO->CitContext);
773 assert(IO->ConnFail);
774 switch (IO->ConnFail(IO))
785 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
787 AsyncIO *IO = watcher->data;
789 socklen_t lon = sizeof(so_err);
792 SetEVState(IO, eIOConnNow);
793 IO->Now = ev_now(event_base);
794 EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
796 ev_io_stop(loop, &IO->conn_event);
797 ev_timer_stop(event_base, &IO->conn_fail);
799 err = getsockopt(IO->SendBuf.fd,
805 if ((err == 0) && (so_err != 0))
807 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
810 IO_connfail_callback(loop, &IO->conn_fail, revents);
815 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
816 set_start_callback(loop, IO, revents);
821 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
825 AsyncIO *IO = watcher->data;
827 IO->Now = ev_now(event_base);
828 switch (IO->NextState) {
830 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
832 StrBufPlain(IO->ErrMsg, errmsg, -1);
835 if (IO->IOB.ChunkSendRemain == 0)
837 IO->NextState = eSendReply;
838 assert(IO->ReadDone);
839 ev_io_stop(event_base, &IO->recv_event);
848 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
860 const char *pch = ChrPtr(IO->RecvBuf.Buf);
861 const char *pchh = IO->RecvBuf.ReadWritePointer;
866 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
867 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
868 ((CitContext*)(IO->CitContext))->ServiceName,
871 fd = fopen(fn, "a+");
872 fprintf(fd, "Read: BufSize: %ld BufContent: [",
874 rv = fwrite(pchh, nbytes, 1, fd);
875 if (!rv) printf("failed to write debug to %s!\n", fn);
882 } else if (nbytes == 0) {
883 StopClientWatchers(IO, 1);
884 SetNextTimeout(IO, 0.01);
886 } else if (nbytes == -1) {
887 if (errno != EAGAIN) {
888 // FD is gone. kick it.
889 StopClientWatchers(IO, 1);
891 "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
892 errno, strerror(errno), IO->SendBuf.fd);
893 StrBufPrintf(IO->ErrMsg,
894 "Socket Invalid! [%s]",
896 SetNextTimeout(IO, 0.01);
903 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
905 AsyncIO *IO = watcher->data;
907 SetEVState(IO, eCaresFinished);
908 IO->Now = ev_now(event_base);
909 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
910 become_session(IO->CitContext);
911 assert(IO->DNS.Query->PostDNS);
912 switch (IO->DNS.Query->PostDNS(IO))
915 assert(IO->DNS.Fail);
916 switch (IO->DNS.Fail(IO)) {
918 //// StopClientWatchers(IO);
929 eNextState EvConnectSock(AsyncIO *IO,
931 double first_rw_timeout,
934 struct sockaddr_in egress_sin;
938 SetEVState(IO, eIOConnectSock);
939 become_session(IO->CitContext);
942 IO->NextState = eReadMessage;
945 IO->NextState = eSendReply;
948 IO->SendBuf.fd = IO->RecvBuf.fd =
950 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
954 if (IO->SendBuf.fd < 0) {
956 "EVENT: socket() failed: %s\n",
959 StrBufPrintf(IO->ErrMsg,
960 "Failed to create socket: %s",
962 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
965 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
968 "EVENT: unable to get socket %d flags! %s \n",
971 StrBufPrintf(IO->ErrMsg,
972 "Failed to get socket %d flags: %s",
975 close(IO->SendBuf.fd);
976 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
979 fdflags = fdflags | O_NONBLOCK;
980 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
983 "EVENT: unable to set socket %d nonblocking flags! %s \n",
986 StrBufPrintf(IO->ErrMsg,
987 "Failed to set socket flags: %s",
989 close(IO->SendBuf.fd);
990 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
993 /* TODO: maye we could use offsetof() to calc the position of data...
994 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
996 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
997 IO->recv_event.data = IO;
998 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
999 IO->send_event.data = IO;
1001 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1002 IO->conn_fail.data = IO;
1003 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1004 IO->rw_timeout.data = IO;
1009 /* for debugging you may bypass it like this:
1010 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1011 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1012 * inet_addr("127.0.0.1");
1014 if (IO->ConnectMe->IPv6) {
1015 rc = connect(IO->SendBuf.fd,
1016 &IO->ConnectMe->Addr,
1017 sizeof(struct sockaddr_in6));
1020 /* If citserver is bound to a specific IP address on the host, make
1021 * sure we use that address for outbound connections.
1024 memset(&egress_sin, 0, sizeof(egress_sin));
1025 egress_sin.sin_family = AF_INET;
1026 if (!IsEmptyStr(config.c_ip_addr)) {
1027 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1028 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1029 egress_sin.sin_addr.s_addr = INADDR_ANY;
1032 /* If this bind fails, no problem; we can still use INADDR_ANY */
1033 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1035 rc = connect(IO->SendBuf.fd,
1036 (struct sockaddr_in *)&IO->ConnectMe->Addr,
1037 sizeof(struct sockaddr_in));
1041 SetEVState(IO, eIOConnNow);
1042 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1043 set_start_callback(event_base, IO, 0);
1044 return IO->NextState;
1046 else if (errno == EINPROGRESS) {
1047 SetEVState(IO, eIOConnWait);
1048 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1050 ev_io_init(&IO->conn_event,
1051 IO_connestd_callback,
1055 IO->conn_event.data = IO;
1057 ev_io_start(event_base, &IO->conn_event);
1058 ev_timer_start(event_base, &IO->conn_fail);
1059 return IO->NextState;
1062 SetEVState(IO, eIOConnfail);
1063 ev_idle_init(&IO->conn_fail_immediate,
1064 IO_connfailimmediate_callback);
1065 IO->conn_fail_immediate.data = IO;
1066 ev_idle_start(event_base, &IO->conn_fail_immediate);
1069 "connect() = %d failed: %s\n",
1073 StrBufPrintf(IO->ErrMsg,
1074 "Failed to connect: %s",
1076 return IO->NextState;
1078 return IO->NextState;
1081 void SetNextTimeout(AsyncIO *IO, double timeout)
1083 IO->rw_timeout.repeat = timeout;
1084 ev_timer_again (event_base, &IO->rw_timeout);
1088 eNextState ReAttachIO(AsyncIO *IO,
1092 SetEVState(IO, eIOAttach);
1094 become_session(IO->CitContext);
1095 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1097 IO->NextState = eReadMessage;
1100 IO->NextState = eSendReply;
1102 set_start_callback(event_base, IO, 0);
1104 return IO->NextState;
1107 void InitIOStruct(AsyncIO *IO,
1109 eNextState NextState,
1110 IO_LineReaderCallback LineReader,
1111 IO_CallBack DNS_Fail,
1112 IO_CallBack SendDone,
1113 IO_CallBack ReadDone,
1114 IO_CallBack Terminate,
1115 IO_CallBack DBTerminate,
1116 IO_CallBack ConnFail,
1117 IO_CallBack Timeout,
1118 IO_CallBack ShutdownAbort)
1122 IO->CitContext = CloneContext(CC);
1123 IO->CitContext->session_specific_data = Data;
1124 IO->CitContext->IO = IO;
1126 IO->NextState = NextState;
1128 IO->SendDone = SendDone;
1129 IO->ReadDone = ReadDone;
1130 IO->Terminate = Terminate;
1131 IO->DBTerminate = DBTerminate;
1132 IO->LineReader = LineReader;
1133 IO->ConnFail = ConnFail;
1134 IO->Timeout = Timeout;
1135 IO->ShutdownAbort = ShutdownAbort;
1137 IO->DNS.Fail = DNS_Fail;
1139 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
1140 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
1141 IO->IOBuf = NewStrBuf();
1142 EV_syslog(LOG_DEBUG,
1143 "EVENT: Session lives at %p IO at %p \n",
1148 extern int evcurl_init(AsyncIO *IO);
1150 int InitcURLIOStruct(AsyncIO *IO,
1153 IO_CallBack SendDone,
1154 IO_CallBack Terminate,
1155 IO_CallBack DBTerminate,
1156 IO_CallBack ShutdownAbort)
1160 IO->CitContext = CloneContext(CC);
1161 IO->CitContext->session_specific_data = Data;
1162 IO->CitContext->IO = IO;
1164 IO->SendDone = SendDone;
1165 IO->Terminate = Terminate;
1166 IO->DBTerminate = DBTerminate;
1167 IO->ShutdownAbort = ShutdownAbort;
1169 strcpy(IO->HttpReq.errdesc, Desc);
1172 return evcurl_init(IO);
1177 typedef struct KillOtherSessionContext {
1180 }KillOtherSessionContext;
1182 eNextState KillTerminate(AsyncIO *IO)
1185 KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1186 EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1188 FreeAsyncIOContents(IO);
1189 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1190 IO->ID = id; /* just for the case we want to analyze it in a coredump */
1196 eNextState KillShutdown(AsyncIO *IO)
1198 return eTerminateConnection;
1201 eNextState KillOtherContextNow(AsyncIO *IO)
1203 KillOtherSessionContext *Ctx = IO->Data;
1205 SetEVState(IO, eKill);
1207 if (Ctx->OtherOne->ShutdownAbort != NULL)
1208 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1209 return eTerminateConnection;
1212 void KillAsyncIOContext(AsyncIO *IO)
1214 KillOtherSessionContext *Ctx;
1216 Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1217 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1219 InitIOStruct(&Ctx->IO,
1234 switch(IO->NextState) {
1247 QueueEventContext(&Ctx->IO, KillOtherContextNow);
1250 QueueDBOperation(&Ctx->IO, KillOtherContextNow);
1252 case eTerminateConnection:
1254 /*hm, its already dying, dunno which Queue its in... */
1260 extern int DebugEventLoopBacktrace;
1261 void EV_backtrace(AsyncIO *IO)
1263 #ifdef HAVE_BACKTRACE
1264 void *stack_frames[50];
1268 if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1270 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1271 strings = backtrace_symbols(stack_frames, size);
1272 for (i = 0; i < size; i++) {
1273 if (strings != NULL) {
1274 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1277 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1285 ev_tstamp ctdl_ev_now (void)
1287 return ev_now(event_base);