3 * Copyright (c) 1998-2012 by the citadel.org team
5 * This program is open source software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 #include <sys/types.h>
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
37 # include <sys/time.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
54 #include <libcitadel.h>
57 #include "citserver.h"
64 #include "internet_addressing.h"
67 #include "clientsocket.h"
68 #include "locate_host.h"
69 #include "citadel_dirs.h"
71 #include "event_client.h"
72 #include "ctdl_module.h"
74 static void IO_abort_shutdown_callback(struct ev_loop *loop,
78 AsyncIO *IO = watcher->data;
79 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
81 assert(IO->ShutdownAbort);
82 IO->ShutdownAbort(IO);
86 /*------------------------------------------------------------------------------
88 *----------------------------------------------------------------------------*/
89 extern int evdb_count;
90 extern pthread_mutex_t DBEventQueueMutex;
91 extern HashList *DBInboundEventQueue;
92 extern struct ev_loop *event_db;
93 extern ev_async DBAddJob;
94 extern ev_async DBExitEventLoop;
96 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
101 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
104 ev_cleanup_init(&IO->db_abort_by_shutdown,
105 IO_abort_shutdown_callback);
106 IO->db_abort_by_shutdown.data = IO;
107 ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
109 pthread_mutex_lock(&DBEventQueueMutex);
110 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
112 Put(DBInboundEventQueue, IKEY(i), h, NULL);
113 pthread_mutex_unlock(&DBEventQueueMutex);
115 ev_async_send (event_db, &DBAddJob);
116 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
120 void ShutDownDBCLient(AsyncIO *IO)
122 CitContext *Ctx =IO->CitContext;
125 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
126 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
128 assert(IO->Terminate);
133 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
135 AsyncIO *IO = watcher->data;
136 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
137 become_session(IO->CitContext);
139 ev_idle_stop(event_db, &IO->db_unwind_stack);
141 assert(IO->NextDBOperation);
142 switch (IO->NextDBOperation(IO))
156 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
158 case eTerminateConnection:
160 ev_idle_stop(event_db, &IO->db_unwind_stack);
161 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
162 ShutDownDBCLient(IO);
166 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
168 IO->NextDBOperation = CB;
169 ev_idle_init(&IO->db_unwind_stack,
171 IO->db_unwind_stack.data = IO;
172 ev_idle_start(event_db, &IO->db_unwind_stack);
176 /*------------------------------------------------------------------------------
178 *----------------------------------------------------------------------------*/
179 extern int evbase_count;
180 extern pthread_mutex_t EventQueueMutex;
181 extern HashList *InboundEventQueue;
182 extern struct ev_loop *event_base;
183 extern ev_async AddJob;
184 extern ev_async ExitEventLoop;
187 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
192 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
195 ev_cleanup_init(&IO->abort_by_shutdown,
196 IO_abort_shutdown_callback);
197 IO->abort_by_shutdown.data = IO;
198 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
200 pthread_mutex_lock(&EventQueueMutex);
201 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
203 Put(InboundEventQueue, IKEY(i), h, NULL);
204 pthread_mutex_unlock(&EventQueueMutex);
206 ev_async_send (event_base, &AddJob);
207 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
211 extern eNextState evcurl_handle_start(AsyncIO *IO);
213 eNextState QueueCurlContext(AsyncIO *IO)
218 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
220 h->EvAttch = evcurl_handle_start;
222 pthread_mutex_lock(&EventQueueMutex);
223 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
225 Put(InboundEventQueue, IKEY(i), h, NULL);
226 pthread_mutex_unlock(&EventQueueMutex);
228 ev_async_send (event_base, &AddJob);
229 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
233 void DestructCAres(AsyncIO *IO);
234 void FreeAsyncIOContents(AsyncIO *IO)
236 CitContext *Ctx = IO->CitContext;
238 FreeStrBuf(&IO->IOBuf);
239 FreeStrBuf(&IO->SendBuf.Buf);
240 FreeStrBuf(&IO->RecvBuf.Buf);
244 FreeURL(&IO->ConnectMe);
245 FreeStrBuf(&IO->HttpReq.ReplyData);
248 Ctx->state = CON_IDLE;
254 void StopClientWatchers(AsyncIO *IO)
256 ev_timer_stop (event_base, &IO->rw_timeout);
257 ev_timer_stop(event_base, &IO->conn_fail);
258 ev_idle_stop(event_base, &IO->unwind_stack);
260 ev_io_stop(event_base, &IO->conn_event);
261 ev_io_stop(event_base, &IO->send_event);
262 ev_io_stop(event_base, &IO->recv_event);
264 if (IO->SendBuf.fd != 0) {
265 close(IO->SendBuf.fd);
271 void ShutDownCLient(AsyncIO *IO)
273 CitContext *Ctx =IO->CitContext;
276 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
278 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
279 StopClientWatchers(IO);
281 if (IO->DNS.Channel != NULL) {
282 ares_destroy(IO->DNS.Channel);
283 EV_DNS_LOG_STOP(DNS.recv_event);
284 EV_DNS_LOG_STOP(DNS.send_event);
285 ev_io_stop(event_base, &IO->DNS.recv_event);
286 ev_io_stop(event_base, &IO->DNS.send_event);
287 IO->DNS.Channel = NULL;
289 assert(IO->Terminate);
293 eReadState HandleInbound(AsyncIO *IO)
295 const char *Err = NULL;
296 eReadState Finished = eBufferNotEmpty;
298 become_session(IO->CitContext);
300 while ((Finished == eBufferNotEmpty) &&
301 ((IO->NextState == eReadMessage)||
302 (IO->NextState == eReadMore)||
303 (IO->NextState == eReadFile)||
304 (IO->NextState == eReadPayload)))
307 * lex line reply in callback,
308 * or do it ourselves.
309 * i.e. as nnn-blabla means continue reading in SMTP
311 if ((IO->NextState == eReadFile) &&
312 (Finished == eBufferNotEmpty))
314 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
315 if (Finished == eReadSuccess)
317 IO->NextState = eSendReply;
320 else if (IO->LineReader)
321 Finished = IO->LineReader(IO);
323 Finished = StrBufChunkSipLine(IO->IOBuf,
327 case eMustReadMore: /// read new from socket...
329 case eBufferNotEmpty: /* shouldn't happen... */
330 case eReadSuccess: /// done for now...
332 case eReadFail: /// WHUT?
337 if (Finished != eMustReadMore) {
338 assert(IO->ReadDone);
339 ev_io_stop(event_base, &IO->recv_event);
340 IO->NextState = IO->ReadDone(IO);
341 Finished = StrBufCheckBuffer(&IO->RecvBuf);
345 switch (IO->NextState) {
347 ev_io_start(event_base, &IO->send_event);
351 assert(IO->SendDone);
352 IO->NextState = IO->SendDone(IO);
353 ev_io_start(event_base, &IO->send_event);
358 ev_io_start(event_base, &IO->recv_event);
360 case eTerminateConnection:
378 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
381 AsyncIO *IO = watcher->data;
382 const char *errmsg = NULL;
384 become_session(IO->CitContext);
390 const char *pch = ChrPtr(IO->SendBuf.Buf);
391 const char *pchh = IO->SendBuf.ReadWritePointer;
397 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
398 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
399 ((CitContext*)(IO->CitContext))->ServiceName,
402 fd = fopen(fn, "a+");
403 fprintf(fd, "Send: BufSize: %ld BufContent: [",
405 rv = fwrite(pchh, nbytes, 1, fd);
406 if (!rv) printf("failed to write debug to %s!\n", fn);
409 switch (IO->NextState) {
411 rc = FileSendChunked(&IO->IOB, &errmsg);
413 StrBufPlain(IO->ErrMsg, errmsg, -1);
416 rc = StrBuf_write_one_chunk_callback(watcher->fd,
422 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
428 ev_io_stop(event_base, &IO->send_event);
429 switch (IO->NextState) {
431 assert(IO->SendDone);
432 IO->NextState = IO->SendDone(IO);
434 if ((IO->NextState == eTerminateConnection) ||
435 (IO->NextState == eAbort) )
438 ev_io_start(event_base, &IO->send_event);
442 if (IO->IOB.ChunkSendRemain > 0) {
443 ev_io_start(event_base, &IO->recv_event);
445 assert(IO->ReadDone);
446 IO->NextState = IO->ReadDone(IO);
447 switch(IO->NextState) {
456 ev_io_start(event_base,
464 case eTerminateConnection:
471 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
473 IO->NextState = eReadMore;
478 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
483 ev_io_start(event_base, &IO->recv_event);
489 * we now live in another queue,
490 * so we have to unregister.
492 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
497 case eTerminateConnection:
506 /* else : must write more. */
509 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
511 switch(IO->NextState) {
515 ev_io_start(event_base, &IO->recv_event);
521 become_session(IO->CitContext);
522 IO_send_callback(loop, &IO->send_event, revents);
528 case eTerminateConnection:
536 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
538 AsyncIO *IO = watcher->data;
540 ev_timer_stop (event_base, &IO->rw_timeout);
541 become_session(IO->CitContext);
543 if (IO->SendBuf.fd != 0)
545 ev_io_stop(event_base, &IO->send_event);
546 ev_io_stop(event_base, &IO->recv_event);
547 ev_timer_stop (event_base, &IO->rw_timeout);
548 close(IO->SendBuf.fd);
549 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
553 switch (IO->Timeout(IO))
563 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
565 AsyncIO *IO = watcher->data;
567 ev_timer_stop (event_base, &IO->conn_fail);
569 if (IO->SendBuf.fd != 0)
571 ev_io_stop(loop, &IO->conn_event);
572 ev_io_stop(event_base, &IO->send_event);
573 ev_io_stop(event_base, &IO->recv_event);
574 ev_timer_stop (event_base, &IO->rw_timeout);
575 close(IO->SendBuf.fd);
576 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
578 become_session(IO->CitContext);
580 assert(IO->ConnFail);
581 switch (IO->ConnFail(IO))
592 IO_connfailimmediate_callback(struct ev_loop *loop,
596 AsyncIO *IO = watcher->data;
598 ev_idle_stop (event_base, &IO->conn_fail_immediate);
600 if (IO->SendBuf.fd != 0)
602 close(IO->SendBuf.fd);
603 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
605 become_session(IO->CitContext);
607 assert(IO->ConnFail);
608 switch (IO->ConnFail(IO))
619 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
621 AsyncIO *IO = watcher->data;
623 ev_io_stop(loop, &IO->conn_event);
624 ev_timer_stop (event_base, &IO->conn_fail);
625 set_start_callback(loop, IO, revents);
628 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
632 AsyncIO *IO = watcher->data;
634 switch (IO->NextState) {
636 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
638 StrBufPlain(IO->ErrMsg, errmsg, -1);
641 if (IO->IOB.ChunkSendRemain == 0)
643 IO->NextState = eSendReply;
650 nbytes = StrBuf_read_one_chunk_callback(watcher->fd,
662 const char *pch = ChrPtr(IO->RecvBuf.Buf);
663 const char *pchh = IO->RecvBuf.ReadWritePointer;
668 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
669 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
670 ((CitContext*)(IO->CitContext))->ServiceName,
673 fd = fopen(fn, "a+");
674 fprintf(fd, "Read: BufSize: %ld BufContent: [",
676 rv = fwrite(pchh, nbytes, 1, fd);
677 if (!rv) printf("failed to write debug to %s!\n", fn);
684 } else if (nbytes == 0) {
687 switch (IO->Timeout(IO))
695 } else if (nbytes == -1) {
696 // FD is gone. kick it.
697 StopClientWatchers(IO);
699 "EVENT: Socket Invalid! %s \n",
707 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
709 AsyncIO *IO = watcher->data;
710 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
711 become_session(IO->CitContext);
712 assert(IO->DNS.Query->PostDNS);
713 switch (IO->DNS.Query->PostDNS(IO))
716 assert(IO->DNS.Fail);
717 switch (IO->DNS.Fail(IO)) {
719 //// StopClientWatchers(IO);
730 eNextState EvConnectSock(AsyncIO *IO,
732 double first_rw_timeout,
735 struct sockaddr_in egress_sin;
739 become_session(IO->CitContext);
742 IO->NextState = eReadMessage;
745 IO->NextState = eSendReply;
748 IO->SendBuf.fd = IO->RecvBuf.fd =
750 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
754 if (IO->SendBuf.fd < 0) {
756 "EVENT: socket() failed: %s\n",
759 StrBufPrintf(IO->ErrMsg,
760 "Failed to create socket: %s",
762 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
765 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
768 "EVENT: unable to get socket flags! %s \n",
770 StrBufPrintf(IO->ErrMsg,
771 "Failed to get socket flags: %s",
773 close(IO->SendBuf.fd);
774 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
777 fdflags = fdflags | O_NONBLOCK;
778 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
781 "EVENT: unable to set socket nonblocking flags! %s \n",
783 StrBufPrintf(IO->ErrMsg,
784 "Failed to set socket flags: %s",
786 close(IO->SendBuf.fd);
787 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
790 /* TODO: maye we could use offsetof() to calc the position of data...
791 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
793 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
794 IO->recv_event.data = IO;
795 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
796 IO->send_event.data = IO;
798 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
799 IO->conn_fail.data = IO;
800 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
801 IO->rw_timeout.data = IO;
806 /* for debugging you may bypass it like this:
807 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
808 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
809 * inet_addr("127.0.0.1");
811 if (IO->ConnectMe->IPv6) {
812 rc = connect(IO->SendBuf.fd,
813 &IO->ConnectMe->Addr,
814 sizeof(struct sockaddr_in6));
817 /* If citserver is bound to a specific IP address on the host, make
818 * sure we use that address for outbound connections.
821 memset(&egress_sin, 0, sizeof(egress_sin));
822 egress_sin.sin_family = AF_INET;
823 if (!IsEmptyStr(config.c_ip_addr)) {
824 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
825 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
826 egress_sin.sin_addr.s_addr = INADDR_ANY;
829 /* If this bind fails, no problem; we can still use INADDR_ANY */
830 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
832 rc = connect(IO->SendBuf.fd,
833 (struct sockaddr_in *)&IO->ConnectMe->Addr,
834 sizeof(struct sockaddr_in));
838 EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
839 set_start_callback(event_base, IO, 0);
840 ev_timer_start(event_base, &IO->rw_timeout);
841 return IO->NextState;
843 else if (errno == EINPROGRESS) {
844 EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
846 ev_io_init(&IO->conn_event,
847 IO_connestd_callback,
851 IO->conn_event.data = IO;
853 ev_io_start(event_base, &IO->conn_event);
854 ev_timer_start(event_base, &IO->conn_fail);
855 return IO->NextState;
858 ev_idle_init(&IO->conn_fail_immediate,
859 IO_connfailimmediate_callback);
860 IO->conn_fail_immediate.data = IO;
861 ev_idle_start(event_base, &IO->conn_fail_immediate);
863 EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
864 StrBufPrintf(IO->ErrMsg,
865 "Failed to connect: %s",
867 return IO->NextState;
869 return IO->NextState;
872 void SetNextTimeout(AsyncIO *IO, double timeout)
874 IO->rw_timeout.repeat = timeout;
875 ev_timer_again (event_base, &IO->rw_timeout);
879 eNextState ReAttachIO(AsyncIO *IO,
884 become_session(IO->CitContext);
885 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
887 IO->NextState = eReadMessage;
890 IO->NextState = eSendReply;
892 set_start_callback(event_base, IO, 0);
894 return IO->NextState;
897 void InitIOStruct(AsyncIO *IO,
899 eNextState NextState,
900 IO_LineReaderCallback LineReader,
901 IO_CallBack DNS_Fail,
902 IO_CallBack SendDone,
903 IO_CallBack ReadDone,
904 IO_CallBack Terminate,
905 IO_CallBack ConnFail,
907 IO_CallBack ShutdownAbort)
911 IO->CitContext = CloneContext(CC);
912 ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
914 IO->NextState = NextState;
916 IO->SendDone = SendDone;
917 IO->ReadDone = ReadDone;
918 IO->Terminate = Terminate;
919 IO->LineReader = LineReader;
920 IO->ConnFail = ConnFail;
921 IO->Timeout = Timeout;
922 IO->ShutdownAbort = ShutdownAbort;
924 IO->DNS.Fail = DNS_Fail;
926 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
927 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
928 IO->IOBuf = NewStrBuf();
930 "EVENT: Session lives at %p IO at %p \n",
935 extern int evcurl_init(AsyncIO *IO);
937 int InitcURLIOStruct(AsyncIO *IO,
940 IO_CallBack SendDone,
941 IO_CallBack Terminate,
942 IO_CallBack ShutdownAbort)
946 IO->CitContext = CloneContext(CC);
947 ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
949 IO->SendDone = SendDone;
950 IO->Terminate = Terminate;
951 IO->ShutdownAbort = ShutdownAbort;
953 strcpy(IO->HttpReq.errdesc, Desc);
956 return evcurl_init(IO);
960 void EV_backtrace(AsyncIO *IO)
962 #ifdef HAVE_BACKTRACE
963 void *stack_frames[50];
968 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
969 strings = backtrace_symbols(stack_frames, size);
970 for (i = 0; i < size; i++) {
972 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
974 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
981 ev_tstamp ctdl_ev_now (void)
983 return ev_now(event_base);