2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 3.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
22 #include <sys/types.h>
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
30 # include <sys/time.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
47 #include <libcitadel.h>
50 #include "citserver.h"
57 #include "internet_addressing.h"
60 #include "clientsocket.h"
61 #include "locate_host.h"
62 #include "citadel_dirs.h"
64 #include "event_client.h"
65 #include "ctdl_module.h"
68 ConstStr IOStates[] = {
75 {HKEY("DB Terminate")},
78 {HKEY("IO Connect Socket")},
81 {HKEY("IO ConnFail")},
82 {HKEY("IO ConnFail Now")},
83 {HKEY("IO Conn Now")},
84 {HKEY("IO Conn Wait")},
87 {HKEY("Curl Shotdown")},
88 {HKEY("Curl More IO")},
89 {HKEY("Curl Got IO")},
90 {HKEY("Curl Got Data")},
91 {HKEY("Curl Got Status")},
92 {HKEY("C-Ares Start")},
93 {HKEY("C-Ares IO Done")},
94 {HKEY("C-Ares Finished")},
95 {HKEY("C-Ares exit")},
100 void SetEVState(AsyncIO *IO, eIOState State)
103 CitContext* CCC = IO->CitContext;
104 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
109 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
110 static void IO_abort_shutdown_callback(struct ev_loop *loop,
115 /*------------------------------------------------------------------------------
117 *----------------------------------------------------------------------------*/
118 extern int evdb_count;
119 extern pthread_mutex_t DBEventQueueMutex;
120 extern pthread_mutex_t DBEventExitQueueMutex;
121 extern HashList *DBInboundEventQueue;
122 extern struct ev_loop *event_db;
123 extern ev_async DBAddJob;
124 extern ev_async DBExitEventLoop;
126 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
131 SetEVState(IO, eDBQ);
132 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
135 ev_cleanup_init(&IO->db_abort_by_shutdown,
136 IO_abort_shutdown_callback);
137 IO->db_abort_by_shutdown.data = IO;
139 pthread_mutex_lock(&DBEventQueueMutex);
140 if (DBInboundEventQueue == NULL)
142 /* shutting down... */
144 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
145 pthread_mutex_unlock(&DBEventQueueMutex);
148 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
150 Put(DBInboundEventQueue, IKEY(i), h, NULL);
151 pthread_mutex_unlock(&DBEventQueueMutex);
153 pthread_mutex_lock(&DBEventExitQueueMutex);
154 if (event_db == NULL)
156 pthread_mutex_unlock(&DBEventExitQueueMutex);
159 ev_async_send (event_db, &DBAddJob);
160 pthread_mutex_unlock(&DBEventExitQueueMutex);
162 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
166 void StopDBWatchers(AsyncIO *IO)
168 SetEVState(IO, eDBStop);
169 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
170 ev_idle_stop(event_db, &IO->db_unwind_stack);
173 void ShutDownDBCLient(AsyncIO *IO)
175 CitContext *Ctx =IO->CitContext;
178 SetEVState(IO, eDBTerm);
179 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
182 assert(IO->DBTerminate);
187 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
189 AsyncIO *IO = watcher->data;
191 SetEVState(IO, eDBNext);
192 IO->Now = ev_now(event_db);
193 EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
194 become_session(IO->CitContext);
196 ev_idle_stop(event_db, &IO->db_unwind_stack);
198 assert(IO->NextDBOperation);
199 switch (IO->NextDBOperation(IO))
213 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
215 case eTerminateConnection:
217 ev_idle_stop(event_db, &IO->db_unwind_stack);
218 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
219 ShutDownDBCLient(IO);
223 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
225 SetEVState(IO, eQDBNext);
226 IO->NextDBOperation = CB;
227 ev_idle_init(&IO->db_unwind_stack,
229 IO->db_unwind_stack.data = IO;
230 ev_idle_start(event_db, &IO->db_unwind_stack);
234 /*------------------------------------------------------------------------------
236 *----------------------------------------------------------------------------*/
237 extern int evbase_count;
238 extern pthread_mutex_t EventQueueMutex;
239 extern pthread_mutex_t EventExitQueueMutex;
240 extern HashList *InboundEventQueue;
241 extern struct ev_loop *event_base;
242 extern ev_async AddJob;
243 extern ev_async ExitEventLoop;
245 static void IO_abort_shutdown_callback(struct ev_loop *loop,
249 AsyncIO *IO = watcher->data;
251 SetEVState(IO, eIOAbort);
252 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
253 IO->Now = ev_now(event_base);
254 assert(IO->ShutdownAbort);
255 IO->ShutdownAbort(IO);
259 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
264 SetEVState(IO, eIOQ);
265 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
268 ev_cleanup_init(&IO->abort_by_shutdown,
269 IO_abort_shutdown_callback);
270 IO->abort_by_shutdown.data = IO;
272 pthread_mutex_lock(&EventQueueMutex);
273 if (InboundEventQueue == NULL)
276 /* shutting down... */
277 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
278 pthread_mutex_unlock(&EventQueueMutex);
281 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
283 Put(InboundEventQueue, IKEY(i), h, NULL);
284 pthread_mutex_unlock(&EventQueueMutex);
286 pthread_mutex_lock(&EventExitQueueMutex);
287 if (event_base == NULL) {
288 pthread_mutex_unlock(&EventExitQueueMutex);
291 ev_async_send (event_base, &AddJob);
292 pthread_mutex_unlock(&EventExitQueueMutex);
293 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
297 extern eNextState evcurl_handle_start(AsyncIO *IO);
299 eNextState QueueCurlContext(AsyncIO *IO)
304 SetEVState(IO, eCurlQ);
305 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
307 h->EvAttch = evcurl_handle_start;
309 pthread_mutex_lock(&EventQueueMutex);
310 if (InboundEventQueue == NULL)
312 /* shutting down... */
314 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
315 pthread_mutex_unlock(&EventQueueMutex);
319 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
321 Put(InboundEventQueue, IKEY(i), h, NULL);
322 pthread_mutex_unlock(&EventQueueMutex);
324 pthread_mutex_lock(&EventExitQueueMutex);
325 if (event_base == NULL) {
326 pthread_mutex_unlock(&EventExitQueueMutex);
329 ev_async_send (event_base, &AddJob);
330 pthread_mutex_unlock(&EventExitQueueMutex);
332 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
336 void DestructCAres(AsyncIO *IO);
337 void FreeAsyncIOContents(AsyncIO *IO)
339 CitContext *Ctx = IO->CitContext;
341 FreeStrBuf(&IO->IOBuf);
342 FreeStrBuf(&IO->SendBuf.Buf);
343 FreeStrBuf(&IO->RecvBuf.Buf);
347 FreeURL(&IO->ConnectMe);
348 FreeStrBuf(&IO->HttpReq.ReplyData);
351 Ctx->state = CON_IDLE;
353 IO->CitContext = NULL;
358 void StopClientWatchers(AsyncIO *IO, int CloseFD)
360 ev_timer_stop (event_base, &IO->rw_timeout);
361 ev_timer_stop(event_base, &IO->conn_fail);
362 ev_idle_stop(event_base, &IO->unwind_stack);
363 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
365 ev_io_stop(event_base, &IO->conn_event);
366 ev_io_stop(event_base, &IO->send_event);
367 ev_io_stop(event_base, &IO->recv_event);
369 if (CloseFD && (IO->SendBuf.fd > 0)) {
370 close(IO->SendBuf.fd);
376 void StopCurlWatchers(AsyncIO *IO)
378 ev_timer_stop (event_base, &IO->rw_timeout);
379 ev_timer_stop(event_base, &IO->conn_fail);
380 ev_idle_stop(event_base, &IO->unwind_stack);
381 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
383 ev_io_stop(event_base, &IO->conn_event);
384 ev_io_stop(event_base, &IO->send_event);
385 ev_io_stop(event_base, &IO->recv_event);
387 if (IO->SendBuf.fd != 0) {
388 close(IO->SendBuf.fd);
394 void ShutDownCLient(AsyncIO *IO)
396 CitContext *Ctx =IO->CitContext;
398 SetEVState(IO, eExit);
401 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
403 StopClientWatchers(IO, 1);
405 if (IO->DNS.Channel != NULL) {
406 ares_destroy(IO->DNS.Channel);
407 EV_DNS_LOG_STOP(DNS.recv_event);
408 EV_DNS_LOG_STOP(DNS.send_event);
409 ev_io_stop(event_base, &IO->DNS.recv_event);
410 ev_io_stop(event_base, &IO->DNS.send_event);
411 IO->DNS.Channel = NULL;
413 assert(IO->Terminate);
417 void PostInbound(AsyncIO *IO)
419 switch (IO->NextState) {
421 ev_io_start(event_base, &IO->send_event);
425 assert(IO->SendDone);
426 IO->NextState = IO->SendDone(IO);
427 switch (IO->NextState)
436 ev_io_start(event_base, &IO->send_event);
439 StopClientWatchers(IO, 0);
447 ev_io_start(event_base, &IO->recv_event);
449 case eTerminateConnection:
463 eReadState HandleInbound(AsyncIO *IO)
465 const char *Err = NULL;
466 eReadState Finished = eBufferNotEmpty;
468 become_session(IO->CitContext);
470 while ((Finished == eBufferNotEmpty) &&
471 ((IO->NextState == eReadMessage)||
472 (IO->NextState == eReadMore)||
473 (IO->NextState == eReadFile)||
474 (IO->NextState == eReadPayload)))
477 * lex line reply in callback,
478 * or do it ourselves.
479 * i.e. as nnn-blabla means continue reading in SMTP
481 if ((IO->NextState == eReadFile) &&
482 (Finished == eBufferNotEmpty))
484 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
485 if (Finished == eReadSuccess)
487 IO->NextState = eSendReply;
490 else if (IO->LineReader)
491 Finished = IO->LineReader(IO);
493 Finished = StrBufChunkSipLine(IO->IOBuf,
497 case eMustReadMore: /// read new from socket...
499 case eBufferNotEmpty: /* shouldn't happen... */
500 case eReadSuccess: /// done for now...
502 case eReadFail: /// WHUT?
507 if (Finished != eMustReadMore) {
508 assert(IO->ReadDone);
509 ev_io_stop(event_base, &IO->recv_event);
510 IO->NextState = IO->ReadDone(IO);
511 Finished = StrBufCheckBuffer(&IO->RecvBuf);
522 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
525 AsyncIO *IO = watcher->data;
526 const char *errmsg = NULL;
528 IO->Now = ev_now(event_base);
529 become_session(IO->CitContext);
535 const char *pch = ChrPtr(IO->SendBuf.Buf);
536 const char *pchh = IO->SendBuf.ReadWritePointer;
542 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
543 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
544 ((CitContext*)(IO->CitContext))->ServiceName,
547 fd = fopen(fn, "a+");
548 fprintf(fd, "Send: BufSize: %ld BufContent: [",
550 rv = fwrite(pchh, nbytes, 1, fd);
551 if (!rv) printf("failed to write debug to %s!\n", fn);
554 switch (IO->NextState) {
556 rc = FileSendChunked(&IO->IOB, &errmsg);
558 StrBufPlain(IO->ErrMsg, errmsg, -1);
561 rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
567 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
573 ev_io_stop(event_base, &IO->send_event);
574 switch (IO->NextState) {
576 assert(IO->SendDone);
577 IO->NextState = IO->SendDone(IO);
579 if ((IO->NextState == eTerminateConnection) ||
580 (IO->NextState == eAbort) )
583 ev_io_start(event_base, &IO->send_event);
587 if (IO->IOB.ChunkSendRemain > 0) {
588 ev_io_start(event_base, &IO->recv_event);
589 SetNextTimeout(IO, 100.0);
592 assert(IO->ReadDone);
593 IO->NextState = IO->ReadDone(IO);
594 switch(IO->NextState) {
603 ev_io_start(event_base,
611 case eTerminateConnection:
618 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
620 IO->NextState = eReadMore;
625 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
630 ev_io_start(event_base, &IO->recv_event);
636 * we now live in another queue,
637 * so we have to unregister.
639 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
644 case eTerminateConnection:
650 if (errno != EAGAIN) {
651 StopClientWatchers(IO, 1);
653 "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
654 errno, strerror(errno), IO->SendBuf.fd);
655 StrBufPrintf(IO->ErrMsg,
656 "Socket Invalid! [%s]",
658 SetNextTimeout(IO, 0.01);
661 /* else : must write more. */
664 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
666 ev_timer_stop(event_base, &IO->conn_fail);
667 ev_timer_start(event_base, &IO->rw_timeout);
669 switch(IO->NextState) {
673 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
674 ev_io_start(event_base, &IO->recv_event);
680 become_session(IO->CitContext);
681 IO_send_callback(loop, &IO->send_event, revents);
687 case eTerminateConnection:
695 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
697 AsyncIO *IO = watcher->data;
699 SetEVState(IO, eIOTimeout);
700 IO->Now = ev_now(event_base);
701 ev_timer_stop (event_base, &IO->rw_timeout);
702 become_session(IO->CitContext);
704 if (IO->SendBuf.fd != 0)
706 ev_io_stop(event_base, &IO->send_event);
707 ev_io_stop(event_base, &IO->recv_event);
708 ev_timer_stop (event_base, &IO->rw_timeout);
709 close(IO->SendBuf.fd);
710 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
714 switch (IO->Timeout(IO))
724 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
726 AsyncIO *IO = watcher->data;
728 SetEVState(IO, eIOConnfail);
729 IO->Now = ev_now(event_base);
730 ev_timer_stop (event_base, &IO->conn_fail);
732 if (IO->SendBuf.fd != 0)
734 ev_io_stop(loop, &IO->conn_event);
735 ev_io_stop(event_base, &IO->send_event);
736 ev_io_stop(event_base, &IO->recv_event);
737 ev_timer_stop (event_base, &IO->rw_timeout);
738 close(IO->SendBuf.fd);
739 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
741 become_session(IO->CitContext);
743 assert(IO->ConnFail);
744 switch (IO->ConnFail(IO))
755 IO_connfailimmediate_callback(struct ev_loop *loop,
759 AsyncIO *IO = watcher->data;
761 SetEVState(IO, eIOConnfailNow);
762 IO->Now = ev_now(event_base);
763 ev_idle_stop (event_base, &IO->conn_fail_immediate);
765 if (IO->SendBuf.fd != 0)
767 close(IO->SendBuf.fd);
768 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
770 become_session(IO->CitContext);
772 assert(IO->ConnFail);
773 switch (IO->ConnFail(IO))
784 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
786 AsyncIO *IO = watcher->data;
788 socklen_t lon = sizeof(so_err);
791 SetEVState(IO, eIOConnNow);
792 IO->Now = ev_now(event_base);
793 EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
795 ev_io_stop(loop, &IO->conn_event);
796 ev_timer_stop(event_base, &IO->conn_fail);
798 err = getsockopt(IO->SendBuf.fd,
804 if ((err == 0) && (so_err != 0))
806 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
809 IO_connfail_callback(loop, &IO->conn_fail, revents);
814 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
815 set_start_callback(loop, IO, revents);
820 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
824 AsyncIO *IO = watcher->data;
826 IO->Now = ev_now(event_base);
827 switch (IO->NextState) {
829 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
831 StrBufPlain(IO->ErrMsg, errmsg, -1);
834 if (IO->IOB.ChunkSendRemain == 0)
836 IO->NextState = eSendReply;
837 assert(IO->ReadDone);
838 ev_io_stop(event_base, &IO->recv_event);
847 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
859 const char *pch = ChrPtr(IO->RecvBuf.Buf);
860 const char *pchh = IO->RecvBuf.ReadWritePointer;
865 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
866 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
867 ((CitContext*)(IO->CitContext))->ServiceName,
870 fd = fopen(fn, "a+");
871 fprintf(fd, "Read: BufSize: %ld BufContent: [",
873 rv = fwrite(pchh, nbytes, 1, fd);
874 if (!rv) printf("failed to write debug to %s!\n", fn);
881 } else if (nbytes == 0) {
882 StopClientWatchers(IO, 1);
883 SetNextTimeout(IO, 0.01);
885 } else if (nbytes == -1) {
886 if (errno != EAGAIN) {
887 // FD is gone. kick it.
888 StopClientWatchers(IO, 1);
890 "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
891 errno, strerror(errno), IO->SendBuf.fd);
892 StrBufPrintf(IO->ErrMsg,
893 "Socket Invalid! [%s]",
895 SetNextTimeout(IO, 0.01);
902 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
904 AsyncIO *IO = watcher->data;
906 SetEVState(IO, eCaresFinished);
907 IO->Now = ev_now(event_base);
908 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
909 become_session(IO->CitContext);
910 assert(IO->DNS.Query->PostDNS);
911 switch (IO->DNS.Query->PostDNS(IO))
914 assert(IO->DNS.Fail);
915 switch (IO->DNS.Fail(IO)) {
917 //// StopClientWatchers(IO);
928 eNextState EvConnectSock(AsyncIO *IO,
930 double first_rw_timeout,
933 struct sockaddr_in egress_sin;
937 SetEVState(IO, eIOConnectSock);
938 become_session(IO->CitContext);
941 IO->NextState = eReadMessage;
944 IO->NextState = eSendReply;
947 IO->SendBuf.fd = IO->RecvBuf.fd =
949 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
953 if (IO->SendBuf.fd < 0) {
955 "EVENT: socket() failed: %s\n",
958 StrBufPrintf(IO->ErrMsg,
959 "Failed to create socket: %s",
961 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
964 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
967 "EVENT: unable to get socket %d flags! %s \n",
970 StrBufPrintf(IO->ErrMsg,
971 "Failed to get socket %d flags: %s",
974 close(IO->SendBuf.fd);
975 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
978 fdflags = fdflags | O_NONBLOCK;
979 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
982 "EVENT: unable to set socket %d nonblocking flags! %s \n",
985 StrBufPrintf(IO->ErrMsg,
986 "Failed to set socket flags: %s",
988 close(IO->SendBuf.fd);
989 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
992 /* TODO: maye we could use offsetof() to calc the position of data...
993 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
995 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
996 IO->recv_event.data = IO;
997 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
998 IO->send_event.data = IO;
1000 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
1001 IO->conn_fail.data = IO;
1002 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
1003 IO->rw_timeout.data = IO;
1008 /* for debugging you may bypass it like this:
1009 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1010 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1011 * inet_addr("127.0.0.1");
1013 if (IO->ConnectMe->IPv6) {
1014 rc = connect(IO->SendBuf.fd,
1015 &IO->ConnectMe->Addr,
1016 sizeof(struct sockaddr_in6));
1019 /* If citserver is bound to a specific IP address on the host, make
1020 * sure we use that address for outbound connections.
1023 memset(&egress_sin, 0, sizeof(egress_sin));
1024 egress_sin.sin_family = AF_INET;
1025 if (!IsEmptyStr(config.c_ip_addr)) {
1026 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1027 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1028 egress_sin.sin_addr.s_addr = INADDR_ANY;
1031 /* If this bind fails, no problem; we can still use INADDR_ANY */
1032 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1034 rc = connect(IO->SendBuf.fd,
1035 (struct sockaddr_in *)&IO->ConnectMe->Addr,
1036 sizeof(struct sockaddr_in));
1040 SetEVState(IO, eIOConnNow);
1041 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1042 set_start_callback(event_base, IO, 0);
1043 return IO->NextState;
1045 else if (errno == EINPROGRESS) {
1046 SetEVState(IO, eIOConnWait);
1047 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1049 ev_io_init(&IO->conn_event,
1050 IO_connestd_callback,
1054 IO->conn_event.data = IO;
1056 ev_io_start(event_base, &IO->conn_event);
1057 ev_timer_start(event_base, &IO->conn_fail);
1058 return IO->NextState;
1061 SetEVState(IO, eIOConnfail);
1062 ev_idle_init(&IO->conn_fail_immediate,
1063 IO_connfailimmediate_callback);
1064 IO->conn_fail_immediate.data = IO;
1065 ev_idle_start(event_base, &IO->conn_fail_immediate);
1068 "connect() = %d failed: %s\n",
1072 StrBufPrintf(IO->ErrMsg,
1073 "Failed to connect: %s",
1075 return IO->NextState;
1077 return IO->NextState;
1080 void SetNextTimeout(AsyncIO *IO, double timeout)
1082 IO->rw_timeout.repeat = timeout;
1083 ev_timer_again (event_base, &IO->rw_timeout);
1087 eNextState ReAttachIO(AsyncIO *IO,
1091 SetEVState(IO, eIOAttach);
1093 become_session(IO->CitContext);
1094 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1096 IO->NextState = eReadMessage;
1099 IO->NextState = eSendReply;
1101 set_start_callback(event_base, IO, 0);
1103 return IO->NextState;
1106 void InitIOStruct(AsyncIO *IO,
1108 eNextState NextState,
1109 IO_LineReaderCallback LineReader,
1110 IO_CallBack DNS_Fail,
1111 IO_CallBack SendDone,
1112 IO_CallBack ReadDone,
1113 IO_CallBack Terminate,
1114 IO_CallBack DBTerminate,
1115 IO_CallBack ConnFail,
1116 IO_CallBack Timeout,
1117 IO_CallBack ShutdownAbort)
1121 IO->CitContext = CloneContext(CC);
1122 IO->CitContext->session_specific_data = Data;
1123 IO->CitContext->IO = IO;
1125 IO->NextState = NextState;
1127 IO->SendDone = SendDone;
1128 IO->ReadDone = ReadDone;
1129 IO->Terminate = Terminate;
1130 IO->DBTerminate = DBTerminate;
1131 IO->LineReader = LineReader;
1132 IO->ConnFail = ConnFail;
1133 IO->Timeout = Timeout;
1134 IO->ShutdownAbort = ShutdownAbort;
1136 IO->DNS.Fail = DNS_Fail;
1138 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
1139 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
1140 IO->IOBuf = NewStrBuf();
1141 EV_syslog(LOG_DEBUG,
1142 "EVENT: Session lives at %p IO at %p \n",
1147 extern int evcurl_init(AsyncIO *IO);
1149 int InitcURLIOStruct(AsyncIO *IO,
1152 IO_CallBack SendDone,
1153 IO_CallBack Terminate,
1154 IO_CallBack DBTerminate,
1155 IO_CallBack ShutdownAbort)
1159 IO->CitContext = CloneContext(CC);
1160 IO->CitContext->session_specific_data = Data;
1161 IO->CitContext->IO = IO;
1163 IO->SendDone = SendDone;
1164 IO->Terminate = Terminate;
1165 IO->DBTerminate = DBTerminate;
1166 IO->ShutdownAbort = ShutdownAbort;
1168 strcpy(IO->HttpReq.errdesc, Desc);
1171 return evcurl_init(IO);
1176 typedef struct KillOtherSessionContext {
1179 }KillOtherSessionContext;
1181 eNextState KillTerminate(AsyncIO *IO)
1184 KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1185 EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1187 FreeAsyncIOContents(IO);
1188 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1189 IO->ID = id; /* just for the case we want to analyze it in a coredump */
1195 eNextState KillShutdown(AsyncIO *IO)
1197 return eTerminateConnection;
1200 eNextState KillOtherContextNow(AsyncIO *IO)
1202 KillOtherSessionContext *Ctx = IO->Data;
1204 SetEVState(IO, eKill);
1206 if (Ctx->OtherOne->ShutdownAbort != NULL)
1207 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1208 return eTerminateConnection;
1211 void KillAsyncIOContext(AsyncIO *IO)
1213 KillOtherSessionContext *Ctx;
1215 Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1216 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1218 InitIOStruct(&Ctx->IO,
1233 switch(IO->NextState) {
1246 QueueEventContext(&Ctx->IO, KillOtherContextNow);
1249 QueueDBOperation(&Ctx->IO, KillOtherContextNow);
1251 case eTerminateConnection:
1253 /*hm, its already dying, dunno which Queue its in... */
1259 extern int DebugEventLoopBacktrace;
1260 void EV_backtrace(AsyncIO *IO)
1262 #ifdef HAVE_BACKTRACE
1263 void *stack_frames[50];
1267 if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1269 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1270 strings = backtrace_symbols(stack_frames, size);
1271 for (i = 0; i < size; i++) {
1272 if (strings != NULL) {
1273 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1276 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1284 ev_tstamp ctdl_ev_now (void)
1286 return ev_now(event_base);