3 * Copyright (c) 1998-2012 by the citadel.org team
5 * This program is open source software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 #include <sys/types.h>
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
37 # include <sys/time.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
54 #include <libcitadel.h>
57 #include "citserver.h"
64 #include "internet_addressing.h"
67 #include "clientsocket.h"
68 #include "locate_host.h"
69 #include "citadel_dirs.h"
71 #include "event_client.h"
72 #include "ctdl_module.h"
74 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
76 static void IO_abort_shutdown_callback(struct ev_loop *loop,
80 AsyncIO *IO = watcher->data;
81 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
83 assert(IO->ShutdownAbort);
84 IO->ShutdownAbort(IO);
88 /*------------------------------------------------------------------------------
90 *----------------------------------------------------------------------------*/
91 extern int evdb_count;
92 extern pthread_mutex_t DBEventQueueMutex;
93 extern HashList *DBInboundEventQueue;
94 extern struct ev_loop *event_db;
95 extern ev_async DBAddJob;
96 extern ev_async DBExitEventLoop;
98 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
103 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
106 ev_cleanup_init(&IO->db_abort_by_shutdown,
107 IO_abort_shutdown_callback);
108 IO->db_abort_by_shutdown.data = IO;
109 ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
111 pthread_mutex_lock(&DBEventQueueMutex);
112 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
114 Put(DBInboundEventQueue, IKEY(i), h, NULL);
115 pthread_mutex_unlock(&DBEventQueueMutex);
117 ev_async_send (event_db, &DBAddJob);
118 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
122 void ShutDownDBCLient(AsyncIO *IO)
124 CitContext *Ctx =IO->CitContext;
127 EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
128 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
130 assert(IO->DBTerminate);
135 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
137 AsyncIO *IO = watcher->data;
138 EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
139 become_session(IO->CitContext);
141 ev_idle_stop(event_db, &IO->db_unwind_stack);
143 assert(IO->NextDBOperation);
144 switch (IO->NextDBOperation(IO))
158 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
160 case eTerminateConnection:
162 ev_idle_stop(event_db, &IO->db_unwind_stack);
163 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
164 ShutDownDBCLient(IO);
168 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
170 IO->NextDBOperation = CB;
171 ev_idle_init(&IO->db_unwind_stack,
173 IO->db_unwind_stack.data = IO;
174 ev_idle_start(event_db, &IO->db_unwind_stack);
178 /*------------------------------------------------------------------------------
180 *----------------------------------------------------------------------------*/
181 extern int evbase_count;
182 extern pthread_mutex_t EventQueueMutex;
183 extern HashList *InboundEventQueue;
184 extern struct ev_loop *event_base;
185 extern ev_async AddJob;
186 extern ev_async ExitEventLoop;
189 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
194 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
197 ev_cleanup_init(&IO->abort_by_shutdown,
198 IO_abort_shutdown_callback);
199 IO->abort_by_shutdown.data = IO;
200 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
202 pthread_mutex_lock(&EventQueueMutex);
203 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
205 Put(InboundEventQueue, IKEY(i), h, NULL);
206 pthread_mutex_unlock(&EventQueueMutex);
208 ev_async_send (event_base, &AddJob);
209 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
213 extern eNextState evcurl_handle_start(AsyncIO *IO);
215 eNextState QueueCurlContext(AsyncIO *IO)
220 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
222 h->EvAttch = evcurl_handle_start;
224 pthread_mutex_lock(&EventQueueMutex);
225 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
227 Put(InboundEventQueue, IKEY(i), h, NULL);
228 pthread_mutex_unlock(&EventQueueMutex);
230 ev_async_send (event_base, &AddJob);
231 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
235 void DestructCAres(AsyncIO *IO);
236 void FreeAsyncIOContents(AsyncIO *IO)
238 CitContext *Ctx = IO->CitContext;
240 FreeStrBuf(&IO->IOBuf);
241 FreeStrBuf(&IO->SendBuf.Buf);
242 FreeStrBuf(&IO->RecvBuf.Buf);
246 FreeURL(&IO->ConnectMe);
247 FreeStrBuf(&IO->HttpReq.ReplyData);
250 Ctx->state = CON_IDLE;
256 void StopClientWatchers(AsyncIO *IO)
258 ev_timer_stop (event_base, &IO->rw_timeout);
259 ev_timer_stop(event_base, &IO->conn_fail);
260 ev_idle_stop(event_base, &IO->unwind_stack);
262 ev_io_stop(event_base, &IO->conn_event);
263 ev_io_stop(event_base, &IO->send_event);
264 ev_io_stop(event_base, &IO->recv_event);
266 if (IO->SendBuf.fd != 0) {
267 close(IO->SendBuf.fd);
273 void ShutDownCLient(AsyncIO *IO)
275 CitContext *Ctx =IO->CitContext;
278 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
280 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
281 StopClientWatchers(IO);
283 if (IO->DNS.Channel != NULL) {
284 ares_destroy(IO->DNS.Channel);
285 EV_DNS_LOG_STOP(DNS.recv_event);
286 EV_DNS_LOG_STOP(DNS.send_event);
287 ev_io_stop(event_base, &IO->DNS.recv_event);
288 ev_io_stop(event_base, &IO->DNS.send_event);
289 IO->DNS.Channel = NULL;
291 assert(IO->Terminate);
295 void PostInbound(AsyncIO *IO)
297 switch (IO->NextState) {
299 ev_io_start(event_base, &IO->send_event);
303 assert(IO->SendDone);
304 IO->NextState = IO->SendDone(IO);
305 ev_io_start(event_base, &IO->send_event);
310 ev_io_start(event_base, &IO->recv_event);
312 case eTerminateConnection:
326 eReadState HandleInbound(AsyncIO *IO)
328 const char *Err = NULL;
329 eReadState Finished = eBufferNotEmpty;
331 become_session(IO->CitContext);
333 while ((Finished == eBufferNotEmpty) &&
334 ((IO->NextState == eReadMessage)||
335 (IO->NextState == eReadMore)||
336 (IO->NextState == eReadFile)||
337 (IO->NextState == eReadPayload)))
340 * lex line reply in callback,
341 * or do it ourselves.
342 * i.e. as nnn-blabla means continue reading in SMTP
344 if ((IO->NextState == eReadFile) &&
345 (Finished == eBufferNotEmpty))
347 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
348 if (Finished == eReadSuccess)
350 IO->NextState = eSendReply;
353 else if (IO->LineReader)
354 Finished = IO->LineReader(IO);
356 Finished = StrBufChunkSipLine(IO->IOBuf,
360 case eMustReadMore: /// read new from socket...
362 case eBufferNotEmpty: /* shouldn't happen... */
363 case eReadSuccess: /// done for now...
365 case eReadFail: /// WHUT?
370 if (Finished != eMustReadMore) {
371 assert(IO->ReadDone);
372 ev_io_stop(event_base, &IO->recv_event);
373 IO->NextState = IO->ReadDone(IO);
374 Finished = StrBufCheckBuffer(&IO->RecvBuf);
385 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
388 AsyncIO *IO = watcher->data;
389 const char *errmsg = NULL;
391 become_session(IO->CitContext);
397 const char *pch = ChrPtr(IO->SendBuf.Buf);
398 const char *pchh = IO->SendBuf.ReadWritePointer;
404 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
405 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
406 ((CitContext*)(IO->CitContext))->ServiceName,
409 fd = fopen(fn, "a+");
410 fprintf(fd, "Send: BufSize: %ld BufContent: [",
412 rv = fwrite(pchh, nbytes, 1, fd);
413 if (!rv) printf("failed to write debug to %s!\n", fn);
416 switch (IO->NextState) {
418 rc = FileSendChunked(&IO->IOB, &errmsg);
420 StrBufPlain(IO->ErrMsg, errmsg, -1);
423 rc = StrBuf_write_one_chunk_callback(watcher->fd,
429 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
435 ev_io_stop(event_base, &IO->send_event);
436 switch (IO->NextState) {
438 assert(IO->SendDone);
439 IO->NextState = IO->SendDone(IO);
441 if ((IO->NextState == eTerminateConnection) ||
442 (IO->NextState == eAbort) )
445 ev_io_start(event_base, &IO->send_event);
449 if (IO->IOB.ChunkSendRemain > 0) {
450 ev_io_start(event_base, &IO->recv_event);
451 SetNextTimeout(IO, 100.0);
454 assert(IO->ReadDone);
455 IO->NextState = IO->ReadDone(IO);
456 switch(IO->NextState) {
465 ev_io_start(event_base,
473 case eTerminateConnection:
480 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
482 IO->NextState = eReadMore;
487 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
492 ev_io_start(event_base, &IO->recv_event);
498 * we now live in another queue,
499 * so we have to unregister.
501 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
506 case eTerminateConnection:
512 IO_Timeout_callback(loop, &IO->rw_timeout, revents);
514 /* else : must write more. */
517 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
519 switch(IO->NextState) {
523 ev_io_start(event_base, &IO->recv_event);
529 become_session(IO->CitContext);
530 IO_send_callback(loop, &IO->send_event, revents);
536 case eTerminateConnection:
544 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
546 AsyncIO *IO = watcher->data;
548 ev_timer_stop (event_base, &IO->rw_timeout);
549 become_session(IO->CitContext);
551 if (IO->SendBuf.fd != 0)
553 ev_io_stop(event_base, &IO->send_event);
554 ev_io_stop(event_base, &IO->recv_event);
555 ev_timer_stop (event_base, &IO->rw_timeout);
556 close(IO->SendBuf.fd);
557 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
561 switch (IO->Timeout(IO))
571 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
573 AsyncIO *IO = watcher->data;
575 ev_timer_stop (event_base, &IO->conn_fail);
577 if (IO->SendBuf.fd != 0)
579 ev_io_stop(loop, &IO->conn_event);
580 ev_io_stop(event_base, &IO->send_event);
581 ev_io_stop(event_base, &IO->recv_event);
582 ev_timer_stop (event_base, &IO->rw_timeout);
583 close(IO->SendBuf.fd);
584 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
586 become_session(IO->CitContext);
588 assert(IO->ConnFail);
589 switch (IO->ConnFail(IO))
600 IO_connfailimmediate_callback(struct ev_loop *loop,
604 AsyncIO *IO = watcher->data;
606 ev_idle_stop (event_base, &IO->conn_fail_immediate);
608 if (IO->SendBuf.fd != 0)
610 close(IO->SendBuf.fd);
611 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
613 become_session(IO->CitContext);
615 assert(IO->ConnFail);
616 switch (IO->ConnFail(IO))
627 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
629 AsyncIO *IO = watcher->data;
631 ev_io_stop(loop, &IO->conn_event);
632 ev_timer_stop (event_base, &IO->conn_fail);
633 set_start_callback(loop, IO, revents);
636 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
640 AsyncIO *IO = watcher->data;
642 switch (IO->NextState) {
644 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
646 StrBufPlain(IO->ErrMsg, errmsg, -1);
649 if (IO->IOB.ChunkSendRemain == 0)
651 IO->NextState = eSendReply;
652 assert(IO->ReadDone);
653 ev_io_stop(event_base, &IO->recv_event);
662 nbytes = StrBuf_read_one_chunk_callback(watcher->fd,
674 const char *pch = ChrPtr(IO->RecvBuf.Buf);
675 const char *pchh = IO->RecvBuf.ReadWritePointer;
680 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
681 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
682 ((CitContext*)(IO->CitContext))->ServiceName,
685 fd = fopen(fn, "a+");
686 fprintf(fd, "Read: BufSize: %ld BufContent: [",
688 rv = fwrite(pchh, nbytes, 1, fd);
689 if (!rv) printf("failed to write debug to %s!\n", fn);
696 } else if (nbytes == 0) {
697 IO_Timeout_callback(loop, &IO->rw_timeout, revents);
699 } else if (nbytes == -1) {
700 // FD is gone. kick it.
701 StopClientWatchers(IO);
703 "EVENT: Socket Invalid! %s \n",
711 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
713 AsyncIO *IO = watcher->data;
714 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
715 become_session(IO->CitContext);
716 assert(IO->DNS.Query->PostDNS);
717 switch (IO->DNS.Query->PostDNS(IO))
720 assert(IO->DNS.Fail);
721 switch (IO->DNS.Fail(IO)) {
723 //// StopClientWatchers(IO);
734 eNextState EvConnectSock(AsyncIO *IO,
736 double first_rw_timeout,
739 struct sockaddr_in egress_sin;
743 become_session(IO->CitContext);
746 IO->NextState = eReadMessage;
749 IO->NextState = eSendReply;
752 IO->SendBuf.fd = IO->RecvBuf.fd =
754 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
758 if (IO->SendBuf.fd < 0) {
760 "EVENT: socket() failed: %s\n",
763 StrBufPrintf(IO->ErrMsg,
764 "Failed to create socket: %s",
766 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
769 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
772 "EVENT: unable to get socket flags! %s \n",
774 StrBufPrintf(IO->ErrMsg,
775 "Failed to get socket flags: %s",
777 close(IO->SendBuf.fd);
778 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
781 fdflags = fdflags | O_NONBLOCK;
782 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
785 "EVENT: unable to set socket nonblocking flags! %s \n",
787 StrBufPrintf(IO->ErrMsg,
788 "Failed to set socket flags: %s",
790 close(IO->SendBuf.fd);
791 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
794 /* TODO: maye we could use offsetof() to calc the position of data...
795 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
797 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
798 IO->recv_event.data = IO;
799 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
800 IO->send_event.data = IO;
802 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
803 IO->conn_fail.data = IO;
804 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
805 IO->rw_timeout.data = IO;
810 /* for debugging you may bypass it like this:
811 * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
812 * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
813 * inet_addr("127.0.0.1");
815 if (IO->ConnectMe->IPv6) {
816 rc = connect(IO->SendBuf.fd,
817 &IO->ConnectMe->Addr,
818 sizeof(struct sockaddr_in6));
821 /* If citserver is bound to a specific IP address on the host, make
822 * sure we use that address for outbound connections.
825 memset(&egress_sin, 0, sizeof(egress_sin));
826 egress_sin.sin_family = AF_INET;
827 if (!IsEmptyStr(config.c_ip_addr)) {
828 egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
829 if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
830 egress_sin.sin_addr.s_addr = INADDR_ANY;
833 /* If this bind fails, no problem; we can still use INADDR_ANY */
834 bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
836 rc = connect(IO->SendBuf.fd,
837 (struct sockaddr_in *)&IO->ConnectMe->Addr,
838 sizeof(struct sockaddr_in));
842 EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
843 set_start_callback(event_base, IO, 0);
844 ev_timer_start(event_base, &IO->rw_timeout);
845 return IO->NextState;
847 else if (errno == EINPROGRESS) {
848 EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
850 ev_io_init(&IO->conn_event,
851 IO_connestd_callback,
855 IO->conn_event.data = IO;
857 ev_io_start(event_base, &IO->conn_event);
858 ev_timer_start(event_base, &IO->conn_fail);
859 return IO->NextState;
862 ev_idle_init(&IO->conn_fail_immediate,
863 IO_connfailimmediate_callback);
864 IO->conn_fail_immediate.data = IO;
865 ev_idle_start(event_base, &IO->conn_fail_immediate);
867 EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
868 StrBufPrintf(IO->ErrMsg,
869 "Failed to connect: %s",
871 return IO->NextState;
873 return IO->NextState;
876 void SetNextTimeout(AsyncIO *IO, double timeout)
878 IO->rw_timeout.repeat = timeout;
879 ev_timer_again (event_base, &IO->rw_timeout);
883 eNextState ReAttachIO(AsyncIO *IO,
888 become_session(IO->CitContext);
889 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
891 IO->NextState = eReadMessage;
894 IO->NextState = eSendReply;
896 set_start_callback(event_base, IO, 0);
898 return IO->NextState;
901 void InitIOStruct(AsyncIO *IO,
903 eNextState NextState,
904 IO_LineReaderCallback LineReader,
905 IO_CallBack DNS_Fail,
906 IO_CallBack SendDone,
907 IO_CallBack ReadDone,
908 IO_CallBack Terminate,
909 IO_CallBack DBTerminate,
910 IO_CallBack ConnFail,
912 IO_CallBack ShutdownAbort)
916 IO->CitContext = CloneContext(CC);
917 ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
919 IO->NextState = NextState;
921 IO->SendDone = SendDone;
922 IO->ReadDone = ReadDone;
923 IO->Terminate = Terminate;
924 IO->DBTerminate = DBTerminate;
925 IO->LineReader = LineReader;
926 IO->ConnFail = ConnFail;
927 IO->Timeout = Timeout;
928 IO->ShutdownAbort = ShutdownAbort;
930 IO->DNS.Fail = DNS_Fail;
932 IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024);
933 IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
934 IO->IOBuf = NewStrBuf();
936 "EVENT: Session lives at %p IO at %p \n",
941 extern int evcurl_init(AsyncIO *IO);
943 int InitcURLIOStruct(AsyncIO *IO,
946 IO_CallBack SendDone,
947 IO_CallBack Terminate,
948 IO_CallBack DBTerminate,
949 IO_CallBack ShutdownAbort)
953 IO->CitContext = CloneContext(CC);
954 ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
956 IO->SendDone = SendDone;
957 IO->Terminate = Terminate;
958 IO->DBTerminate = DBTerminate;
959 IO->ShutdownAbort = ShutdownAbort;
961 strcpy(IO->HttpReq.errdesc, Desc);
964 return evcurl_init(IO);
968 void EV_backtrace(AsyncIO *IO)
970 #ifdef HAVE_BACKTRACE
971 void *stack_frames[50];
976 size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
977 strings = backtrace_symbols(stack_frames, size);
978 for (i = 0; i < size; i++) {
980 EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
982 EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
989 ev_tstamp ctdl_ev_now (void)
991 return ev_now(event_base);