2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 3.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
25 #include <libcitadel.h>
27 #include "ctdl_module.h"
28 #include "event_client.h"
30 ConstStr IOStates[] = {
37 {HKEY("DB Terminate")},
40 {HKEY("IO Connect Socket")},
43 {HKEY("IO ConnFail")},
44 {HKEY("IO ConnFail Now")},
45 {HKEY("IO Conn Now")},
46 {HKEY("IO Conn Wait")},
49 {HKEY("Curl Shotdown")},
50 {HKEY("Curl More IO")},
51 {HKEY("Curl Got IO")},
52 {HKEY("Curl Got Data")},
53 {HKEY("Curl Got Status")},
54 {HKEY("C-Ares Start")},
55 {HKEY("C-Ares IO Done")},
56 {HKEY("C-Ares Finished")},
57 {HKEY("C-Ares exit")},
62 void SetEVState(AsyncIO *IO, eIOState State)
65 CitContext* CCC = IO->CitContext;
67 memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1);
72 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
73 static void IO_abort_shutdown_callback(struct ev_loop *loop,
78 /*------------------------------------------------------------------------------
80 *----------------------------------------------------------------------------*/
81 extern int evdb_count;
82 extern pthread_mutex_t DBEventQueueMutex;
83 extern pthread_mutex_t DBEventExitQueueMutex;
84 extern HashList *DBInboundEventQueue;
85 extern struct ev_loop *event_db;
86 extern ev_async DBAddJob;
87 extern ev_async DBExitEventLoop;
89 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
95 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
98 ev_cleanup_init(&IO->db_abort_by_shutdown,
99 IO_abort_shutdown_callback);
100 IO->db_abort_by_shutdown.data = IO;
102 pthread_mutex_lock(&DBEventQueueMutex);
103 if (DBInboundEventQueue == NULL)
105 /* shutting down... */
107 EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
108 pthread_mutex_unlock(&DBEventQueueMutex);
111 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
113 Put(DBInboundEventQueue, IKEY(i), h, NULL);
114 pthread_mutex_unlock(&DBEventQueueMutex);
116 pthread_mutex_lock(&DBEventExitQueueMutex);
117 if (event_db == NULL)
119 pthread_mutex_unlock(&DBEventExitQueueMutex);
122 ev_async_send (event_db, &DBAddJob);
123 pthread_mutex_unlock(&DBEventExitQueueMutex);
125 EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
129 void StopDBWatchers(AsyncIO *IO)
131 SetEVState(IO, eDBStop);
132 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
133 ev_idle_stop(event_db, &IO->db_unwind_stack);
136 void ShutDownDBCLient(AsyncIO *IO)
138 CitContext *Ctx =IO->CitContext;
141 SetEVState(IO, eDBTerm);
142 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
145 assert(IO->DBTerminate);
150 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
152 AsyncIO *IO = watcher->data;
154 SetEVState(IO, eDBNext);
155 IO->Now = ev_now(event_db);
156 EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
157 become_session(IO->CitContext);
159 ev_idle_stop(event_db, &IO->db_unwind_stack);
161 assert(IO->NextDBOperation);
162 switch (IO->NextDBOperation(IO))
176 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
178 case eTerminateConnection:
180 ev_idle_stop(event_db, &IO->db_unwind_stack);
181 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
182 ShutDownDBCLient(IO);
186 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
188 SetEVState(IO, eQDBNext);
189 IO->NextDBOperation = CB;
190 ev_idle_init(&IO->db_unwind_stack,
192 IO->db_unwind_stack.data = IO;
193 ev_idle_start(event_db, &IO->db_unwind_stack);
197 /*------------------------------------------------------------------------------
199 *----------------------------------------------------------------------------*/
200 extern int evbase_count;
201 extern pthread_mutex_t EventQueueMutex;
202 extern pthread_mutex_t EventExitQueueMutex;
203 extern HashList *InboundEventQueue;
204 extern struct ev_loop *event_base;
205 extern ev_async AddJob;
206 extern ev_async ExitEventLoop;
208 static void IO_abort_shutdown_callback(struct ev_loop *loop,
212 AsyncIO *IO = watcher->data;
214 SetEVState(IO, eIOAbort);
215 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
216 IO->Now = ev_now(event_base);
217 assert(IO->ShutdownAbort);
218 IO->ShutdownAbort(IO);
222 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
227 SetEVState(IO, eIOQ);
228 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
231 ev_cleanup_init(&IO->abort_by_shutdown,
232 IO_abort_shutdown_callback);
233 IO->abort_by_shutdown.data = IO;
235 pthread_mutex_lock(&EventQueueMutex);
236 if (InboundEventQueue == NULL)
239 /* shutting down... */
240 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
241 pthread_mutex_unlock(&EventQueueMutex);
244 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
246 Put(InboundEventQueue, IKEY(i), h, NULL);
247 pthread_mutex_unlock(&EventQueueMutex);
249 pthread_mutex_lock(&EventExitQueueMutex);
250 if (event_base == NULL) {
251 pthread_mutex_unlock(&EventExitQueueMutex);
254 ev_async_send (event_base, &AddJob);
255 pthread_mutex_unlock(&EventExitQueueMutex);
256 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
260 eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs)
262 StopClientWatchers(IO, CloseFDs);
263 return QueueDBOperation(IO, CB);
265 eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB)
268 return QueueEventContext(IO, CB);
271 extern eNextState evcurl_handle_start(AsyncIO *IO);
273 eNextState QueueCurlContext(AsyncIO *IO)
278 SetEVState(IO, eCurlQ);
279 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
281 h->EvAttch = evcurl_handle_start;
283 pthread_mutex_lock(&EventQueueMutex);
284 if (InboundEventQueue == NULL)
286 /* shutting down... */
288 EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
289 pthread_mutex_unlock(&EventQueueMutex);
293 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
295 Put(InboundEventQueue, IKEY(i), h, NULL);
296 pthread_mutex_unlock(&EventQueueMutex);
298 pthread_mutex_lock(&EventExitQueueMutex);
299 if (event_base == NULL) {
300 pthread_mutex_unlock(&EventExitQueueMutex);
303 ev_async_send (event_base, &AddJob);
304 pthread_mutex_unlock(&EventExitQueueMutex);
306 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
310 eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB)
312 StopCurlWatchers(IO);
313 return QueueDBOperation(IO, CB);
317 void FreeAsyncIOContents(AsyncIO *IO)
319 CitContext *Ctx = IO->CitContext;
321 FreeStrBuf(&IO->IOBuf);
322 FreeStrBuf(&IO->SendBuf.Buf);
323 FreeStrBuf(&IO->RecvBuf.Buf);
325 FreeURL(&IO->ConnectMe);
326 FreeStrBuf(&IO->HttpReq.ReplyData);
329 Ctx->state = CON_IDLE;
331 IO->CitContext = NULL;
336 void DestructCAres(AsyncIO *IO);
337 void StopClientWatchers(AsyncIO *IO, int CloseFD)
339 EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers");
343 ev_timer_stop (event_base, &IO->rw_timeout);
344 ev_timer_stop(event_base, &IO->conn_fail);
345 ev_idle_stop(event_base, &IO->unwind_stack);
346 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
348 ev_io_stop(event_base, &IO->conn_event);
349 ev_io_stop(event_base, &IO->send_event);
350 ev_io_stop(event_base, &IO->recv_event);
352 if (CloseFD && (IO->SendBuf.fd > 0)) {
353 close(IO->SendBuf.fd);
359 void StopCurlWatchers(AsyncIO *IO)
361 EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n");
363 ev_timer_stop (event_base, &IO->rw_timeout);
364 ev_timer_stop(event_base, &IO->conn_fail);
365 ev_idle_stop(event_base, &IO->unwind_stack);
366 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
368 ev_io_stop(event_base, &IO->conn_event);
369 ev_io_stop(event_base, &IO->send_event);
370 ev_io_stop(event_base, &IO->recv_event);
372 curl_easy_cleanup(IO->HttpReq.chnd);
373 IO->HttpReq.chnd = NULL;
375 if (IO->SendBuf.fd != 0) {
376 close(IO->SendBuf.fd);
382 void ShutDownCLient(AsyncIO *IO)
384 CitContext *Ctx =IO->CitContext;
386 SetEVState(IO, eExit);
389 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
391 StopClientWatchers(IO, 1);
393 if (IO->DNS.Channel != NULL) {
394 ares_destroy(IO->DNS.Channel);
395 EV_DNS_LOG_STOP(DNS.recv_event);
396 EV_DNS_LOG_STOP(DNS.send_event);
397 ev_io_stop(event_base, &IO->DNS.recv_event);
398 ev_io_stop(event_base, &IO->DNS.send_event);
399 IO->DNS.Channel = NULL;
401 assert(IO->Terminate);
405 void PostInbound(AsyncIO *IO)
407 switch (IO->NextState) {
409 ev_io_start(event_base, &IO->send_event);
413 assert(IO->SendDone);
414 IO->NextState = IO->SendDone(IO);
415 switch (IO->NextState)
424 ev_io_start(event_base, &IO->send_event);
427 StopClientWatchers(IO, 0);
435 ev_io_start(event_base, &IO->recv_event);
437 case eTerminateConnection:
451 eReadState HandleInbound(AsyncIO *IO)
453 const char *Err = NULL;
454 eReadState Finished = eBufferNotEmpty;
456 become_session(IO->CitContext);
458 while ((Finished == eBufferNotEmpty) &&
459 ((IO->NextState == eReadMessage)||
460 (IO->NextState == eReadMore)||
461 (IO->NextState == eReadFile)||
462 (IO->NextState == eReadPayload)))
465 * lex line reply in callback,
466 * or do it ourselves.
467 * i.e. as nnn-blabla means continue reading in SMTP
469 if ((IO->NextState == eReadFile) &&
470 (Finished == eBufferNotEmpty))
472 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
473 if (Finished == eReadSuccess)
475 IO->NextState = eSendReply;
478 else if (IO->LineReader)
479 Finished = IO->LineReader(IO);
481 Finished = StrBufChunkSipLine(IO->IOBuf,
485 case eMustReadMore: /// read new from socket...
487 case eBufferNotEmpty: /* shouldn't happen... */
488 case eReadSuccess: /// done for now...
490 case eReadFail: /// WHUT?
495 if (Finished != eMustReadMore) {
497 assert(IO->ReadDone);
498 ev_io_stop(event_base, &IO->recv_event);
499 rc = IO->ReadDone(IO);
500 if (rc != eDBQuery) {
502 Finished = StrBufCheckBuffer(&IO->RecvBuf);
518 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
521 AsyncIO *IO = watcher->data;
522 const char *errmsg = NULL;
524 IO->Now = ev_now(event_base);
525 become_session(IO->CitContext);
531 const char *pch = ChrPtr(IO->SendBuf.Buf);
532 const char *pchh = IO->SendBuf.ReadWritePointer;
538 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
539 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
540 ((CitContext*)(IO->CitContext))->ServiceName,
543 fd = fopen(fn, "a+");
544 fprintf(fd, "Send: BufSize: %ld BufContent: [",
546 rv = fwrite(pchh, nbytes, 1, fd);
547 if (!rv) printf("failed to write debug to %s!\n", fn);
550 switch (IO->NextState) {
552 rc = FileSendChunked(&IO->IOB, &errmsg);
554 StrBufPlain(IO->ErrMsg, errmsg, -1);
557 rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
563 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
569 ev_io_stop(event_base, &IO->send_event);
570 switch (IO->NextState) {
572 assert(IO->SendDone);
573 IO->NextState = IO->SendDone(IO);
575 if ((IO->NextState == eTerminateConnection) ||
576 (IO->NextState == eAbort) )
579 ev_io_start(event_base, &IO->send_event);
583 if (IO->IOB.ChunkSendRemain > 0) {
584 ev_io_start(event_base, &IO->recv_event);
585 SetNextTimeout(IO, 100.0);
588 assert(IO->ReadDone);
589 IO->NextState = IO->ReadDone(IO);
590 switch(IO->NextState) {
599 ev_io_start(event_base,
607 case eTerminateConnection:
614 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
616 IO->NextState = eReadMore;
621 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
626 ev_io_start(event_base, &IO->recv_event);
632 * we now live in another queue,
633 * so we have to unregister.
635 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
640 case eTerminateConnection:
646 if (errno != EAGAIN) {
647 StopClientWatchers(IO, 1);
649 "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n",
650 errno, strerror(errno), IO->SendBuf.fd);
651 StrBufPrintf(IO->ErrMsg,
652 "Socket Invalid! [%s]",
654 SetNextTimeout(IO, 0.01);
657 /* else : must write more. */
660 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
662 ev_timer_stop(event_base, &IO->conn_fail);
663 ev_timer_start(event_base, &IO->rw_timeout);
665 switch(IO->NextState) {
669 StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
670 ev_io_start(event_base, &IO->recv_event);
676 become_session(IO->CitContext);
677 IO_send_callback(loop, &IO->send_event, revents);
683 case eTerminateConnection:
691 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
693 AsyncIO *IO = watcher->data;
695 SetEVState(IO, eIOTimeout);
696 IO->Now = ev_now(event_base);
697 ev_timer_stop (event_base, &IO->rw_timeout);
698 become_session(IO->CitContext);
700 if (IO->SendBuf.fd != 0)
702 ev_io_stop(event_base, &IO->send_event);
703 ev_io_stop(event_base, &IO->recv_event);
704 ev_timer_stop (event_base, &IO->rw_timeout);
705 close(IO->SendBuf.fd);
706 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
710 switch (IO->Timeout(IO))
720 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
722 AsyncIO *IO = watcher->data;
724 SetEVState(IO, eIOConnfail);
725 IO->Now = ev_now(event_base);
726 ev_timer_stop (event_base, &IO->conn_fail);
728 if (IO->SendBuf.fd != 0)
730 ev_io_stop(loop, &IO->conn_event);
731 ev_io_stop(event_base, &IO->send_event);
732 ev_io_stop(event_base, &IO->recv_event);
733 ev_timer_stop (event_base, &IO->rw_timeout);
734 close(IO->SendBuf.fd);
735 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
737 become_session(IO->CitContext);
739 assert(IO->ConnFail);
740 switch (IO->ConnFail(IO))
751 IO_connfailimmediate_callback(struct ev_loop *loop,
755 AsyncIO *IO = watcher->data;
757 SetEVState(IO, eIOConnfailNow);
758 IO->Now = ev_now(event_base);
759 ev_idle_stop (event_base, &IO->conn_fail_immediate);
761 if (IO->SendBuf.fd != 0)
763 close(IO->SendBuf.fd);
764 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
766 become_session(IO->CitContext);
768 assert(IO->ConnFail);
769 switch (IO->ConnFail(IO))
780 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
782 AsyncIO *IO = watcher->data;
784 socklen_t lon = sizeof(so_err);
787 SetEVState(IO, eIOConnNow);
788 IO->Now = ev_now(event_base);
789 EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
791 ev_io_stop(loop, &IO->conn_event);
792 ev_timer_stop(event_base, &IO->conn_fail);
794 err = getsockopt(IO->SendBuf.fd,
800 if ((err == 0) && (so_err != 0))
802 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
805 IO_connfail_callback(loop, &IO->conn_fail, revents);
810 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
811 set_start_callback(loop, IO, revents);
816 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
820 AsyncIO *IO = watcher->data;
822 IO->Now = ev_now(event_base);
823 switch (IO->NextState) {
825 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
827 StrBufPlain(IO->ErrMsg, errmsg, -1);
830 if (IO->IOB.ChunkSendRemain == 0)
832 IO->NextState = eSendReply;
833 assert(IO->ReadDone);
834 ev_io_stop(event_base, &IO->recv_event);
843 nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
855 const char *pch = ChrPtr(IO->RecvBuf.Buf);
856 const char *pchh = IO->RecvBuf.ReadWritePointer;
861 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
862 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
863 ((CitContext*)(IO->CitContext))->ServiceName,
866 fd = fopen(fn, "a+");
867 fprintf(fd, "Read: BufSize: %ld BufContent: [",
869 rv = fwrite(pchh, nbytes, 1, fd);
870 if (!rv) printf("failed to write debug to %s!\n", fn);
877 } else if (nbytes == 0) {
878 StopClientWatchers(IO, 1);
879 SetNextTimeout(IO, 0.01);
881 } else if (nbytes == -1) {
882 if (errno != EAGAIN) {
883 // FD is gone. kick it.
884 StopClientWatchers(IO, 1);
886 "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n",
887 errno, strerror(errno), IO->SendBuf.fd);
888 StrBufPrintf(IO->ErrMsg,
889 "Socket Invalid! [%s]",
891 SetNextTimeout(IO, 0.01);
898 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
900 AsyncIO *IO = watcher->data;
902 SetEVState(IO, eCaresFinished);
903 IO->Now = ev_now(event_base);
904 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
905 become_session(IO->CitContext);
906 assert(IO->DNS.Query->PostDNS);
907 switch (IO->DNS.Query->PostDNS(IO))
910 assert(IO->DNS.Fail);
911 switch (IO->DNS.Fail(IO)) {
913 //// StopClientWatchers(IO);
924 eNextState EvConnectSock(AsyncIO *IO,
926 double first_rw_timeout,
929 struct sockaddr_in egress_sin;
933 SetEVState(IO, eIOConnectSock);
934 become_session(IO->CitContext);
937 IO->NextState = eReadMessage;
940 IO->NextState = eSendReply;
943 IO->SendBuf.fd = IO->RecvBuf.fd =
945 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
949 if (IO->SendBuf.fd < 0) {
951 "EVENT: socket() failed: %s\n",
954 StrBufPrintf(IO->ErrMsg,
955 "Failed to create socket: %s",
957 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
960 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
963 "EVENT: unable to get socket %d flags! %s \n",
966 StrBufPrintf(IO->ErrMsg,
967 "Failed to get socket %d flags: %s",
970 close(IO->SendBuf.fd);
971 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
974 fdflags = fdflags | O_NONBLOCK;
975 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
978 "EVENT: unable to set socket %d nonblocking flags! %s \n",
981 StrBufPrintf(IO->ErrMsg,
982 "Failed to set socket flags: %s",
984 close(IO->SendBuf.fd);
985 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
988 /* TODO: maye we could use offsetof() to calc the position of data...
989 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
991 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
992 IO->recv_event.data = IO;
993 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
994 IO->send_event.data = IO;
996 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
997 IO->conn_fail.data = IO;
998 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
999 IO->rw_timeout.data = IO;
1004 /* for debugging you may bypass it like this:
1005 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
1006 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
1007 * inet_addr("127.0.0.1");
1009 if (IO->ConnectMe->IPv6) {
1010 rc = connect(IO->SendBuf.fd,
1011 &IO->ConnectMe->Addr,
1012 sizeof(struct sockaddr_in6));
1015 /* If citserver is bound to a specific IP address on the host, make
1016 * sure we use that address for outbound connections.
1019 memset(&egress_sin, 0, sizeof(egress_sin));
1020 egress_sin.sin_family = AF_INET;
1021 if (!IsEmptyStr(config.c_ip_addr)) {
1022 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
1023 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
1024 egress_sin.sin_addr.s_addr = INADDR_ANY;
1027 /* If this bind fails, no problem; we can still use INADDR_ANY */
1028 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
1030 rc = connect(IO->SendBuf.fd,
1031 (struct sockaddr_in *)&IO->ConnectMe->Addr,
1032 sizeof(struct sockaddr_in));
1036 SetEVState(IO, eIOConnNow);
1037 EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd);
1038 set_start_callback(event_base, IO, 0);
1039 return IO->NextState;
1041 else if (errno == EINPROGRESS) {
1042 SetEVState(IO, eIOConnWait);
1043 EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd);
1045 ev_io_init(&IO->conn_event,
1046 IO_connestd_callback,
1050 IO->conn_event.data = IO;
1052 ev_io_start(event_base, &IO->conn_event);
1053 ev_timer_start(event_base, &IO->conn_fail);
1054 return IO->NextState;
1057 SetEVState(IO, eIOConnfail);
1058 ev_idle_init(&IO->conn_fail_immediate,
1059 IO_connfailimmediate_callback);
1060 IO->conn_fail_immediate.data = IO;
1061 ev_idle_start(event_base, &IO->conn_fail_immediate);
1064 "connect() = %d failed: %s\n",
1068 StrBufPrintf(IO->ErrMsg,
1069 "Failed to connect: %s",
1071 return IO->NextState;
1073 return IO->NextState;
1076 void SetNextTimeout(AsyncIO *IO, double timeout)
1078 IO->rw_timeout.repeat = timeout;
1079 ev_timer_again (event_base, &IO->rw_timeout);
1083 eNextState ReAttachIO(AsyncIO *IO,
1087 SetEVState(IO, eIOAttach);
1089 become_session(IO->CitContext);
1090 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
1092 IO->NextState = eReadMessage;
1095 IO->NextState = eSendReply;
1097 set_start_callback(event_base, IO, 0);
1099 return IO->NextState;
1102 void InitIOStruct(AsyncIO *IO,
1104 eNextState NextState,
1105 IO_LineReaderCallback LineReader,
1106 IO_CallBack DNS_Fail,
1107 IO_CallBack SendDone,
1108 IO_CallBack ReadDone,
1109 IO_CallBack Terminate,
1110 IO_CallBack DBTerminate,
1111 IO_CallBack ConnFail,
1112 IO_CallBack Timeout,
1113 IO_CallBack ShutdownAbort)
1117 IO->CitContext = CloneContext(CC);
1118 IO->CitContext->session_specific_data = Data;
1119 IO->CitContext->IO = IO;
1121 IO->NextState = NextState;
1123 IO->SendDone = SendDone;
1124 IO->ReadDone = ReadDone;
1125 IO->Terminate = Terminate;
1126 IO->DBTerminate = DBTerminate;
1127 IO->LineReader = LineReader;
1128 IO->ConnFail = ConnFail;
1129 IO->Timeout = Timeout;
1130 IO->ShutdownAbort = ShutdownAbort;
1132 IO->DNS.Fail = DNS_Fail;
1134 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
1135 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
1136 IO->IOBuf = NewStrBuf();
1137 EV_syslog(LOG_DEBUG,
1138 "EVENT: Session lives at %p IO at %p \n",
1143 extern int evcurl_init(AsyncIO *IO);
1145 int InitcURLIOStruct(AsyncIO *IO,
1148 IO_CallBack SendDone,
1149 IO_CallBack Terminate,
1150 IO_CallBack DBTerminate,
1151 IO_CallBack ShutdownAbort)
1155 IO->CitContext = CloneContext(CC);
1156 IO->CitContext->session_specific_data = Data;
1157 IO->CitContext->IO = IO;
1159 IO->SendDone = SendDone;
1160 IO->Terminate = Terminate;
1161 IO->DBTerminate = DBTerminate;
1162 IO->ShutdownAbort = ShutdownAbort;
1164 strcpy(IO->HttpReq.errdesc, Desc);
1167 return evcurl_init(IO);
1172 typedef struct KillOtherSessionContext {
1175 }KillOtherSessionContext;
1177 eNextState KillTerminate(AsyncIO *IO)
1180 KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data;
1181 EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__);
1183 FreeAsyncIOContents(IO);
1184 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1185 IO->ID = id; /* just for the case we want to analyze it in a coredump */
1191 eNextState KillShutdown(AsyncIO *IO)
1193 return eTerminateConnection;
1196 eNextState KillOtherContextNow(AsyncIO *IO)
1198 KillOtherSessionContext *Ctx = IO->Data;
1200 SetEVState(IO, eKill);
1202 if (Ctx->OtherOne->ShutdownAbort != NULL)
1203 Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne);
1204 return eTerminateConnection;
1207 void KillAsyncIOContext(AsyncIO *IO)
1209 KillOtherSessionContext *Ctx;
1211 Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext));
1212 memset(Ctx, 0, sizeof(KillOtherSessionContext));
1214 InitIOStruct(&Ctx->IO,
1229 switch(IO->NextState) {
1242 QueueEventContext(&Ctx->IO, KillOtherContextNow);
1245 QueueDBOperation(&Ctx->IO, KillOtherContextNow);
1247 case eTerminateConnection:
1249 /*hm, its already dying, dunno which Queue its in... */
1255 extern int DebugEventLoopBacktrace;
1256 void EV_backtrace(AsyncIO *IO)
1258 #ifdef HAVE_BACKTRACE
1259 void *stack_frames[50];
1263 if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
1265 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
1266 strings = backtrace_symbols(stack_frames, size);
1267 for (i = 0; i < size; i++) {
1268 if (strings != NULL) {
1269 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
1272 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
1280 ev_tstamp ctdl_ev_now (void)
1282 return ev_now(event_base);