3 * Copyright (c) 1998-2009 by the citadel.org team
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 #include <sys/types.h>
32 #if TIME_WITH_SYS_TIME
33 # include <sys/time.h>
37 # include <sys/time.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
51 #include <libcitadel.h>
54 #include "citserver.h"
61 #include "internet_addressing.h"
64 #include "clientsocket.h"
65 #include "locate_host.h"
66 #include "citadel_dirs.h"
68 #include "event_client.h"
70 static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents)
72 AsyncIO *IO = watcher->data;
73 EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
75 assert(IO->ShutdownAbort);
76 IO->ShutdownAbort(IO);
80 /*--------------------------------------------------------------------------------
83 extern int evdb_count;
84 extern pthread_mutex_t DBEventQueueMutex;
85 extern HashList *DBInboundEventQueue;
86 extern struct ev_loop *event_db;
87 extern ev_async DBAddJob;
88 extern ev_async DBExitEventLoop;
90 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
95 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
98 ev_cleanup_init(&IO->db_abort_by_shutdown,
99 IO_abort_shutdown_callback);
100 IO->db_abort_by_shutdown.data = IO;
101 ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
103 pthread_mutex_lock(&DBEventQueueMutex);
104 EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
106 Put(DBInboundEventQueue, IKEY(i), h, NULL);
107 pthread_mutex_unlock(&DBEventQueueMutex);
109 ev_async_send (event_db, &DBAddJob);
110 EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
114 void ShutDownDBCLient(AsyncIO *IO)
116 CitContext *Ctx =IO->CitContext;
119 EVM_syslog(LOG_DEBUG, "DBEVENT\n");
120 ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
122 assert(IO->Terminate);
125 Ctx->state = CON_IDLE;
130 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
132 AsyncIO *IO = watcher->data;
133 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
134 become_session(IO->CitContext);
136 ev_idle_stop(event_db, &IO->db_unwind_stack);
138 assert(IO->NextDBOperation);
139 switch (IO->NextDBOperation(IO))
153 ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
155 case eTerminateConnection:
157 ShutDownDBCLient(IO);
161 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
163 IO->NextDBOperation = CB;
164 ev_idle_init(&IO->db_unwind_stack,
166 IO->db_unwind_stack.data = IO;
167 ev_idle_start(event_db, &IO->db_unwind_stack);
171 /*--------------------------------------------------------------------------------
174 extern int evbase_count;
175 extern pthread_mutex_t EventQueueMutex;
176 extern HashList *InboundEventQueue;
177 extern struct ev_loop *event_base;
178 extern ev_async AddJob;
179 extern ev_async ExitEventLoop;
182 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
187 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
190 ev_cleanup_init(&IO->abort_by_shutdown,
191 IO_abort_shutdown_callback);
192 IO->abort_by_shutdown.data = IO;
193 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
195 pthread_mutex_lock(&EventQueueMutex);
196 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
198 Put(InboundEventQueue, IKEY(i), h, NULL);
199 pthread_mutex_unlock(&EventQueueMutex);
201 ev_async_send (event_base, &AddJob);
202 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
206 extern eNextState evcurl_handle_start(AsyncIO *IO);
208 eNextState QueueCurlContext(AsyncIO *IO)
213 h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
215 h->EvAttch = evcurl_handle_start;
217 pthread_mutex_lock(&EventQueueMutex);
218 EVM_syslog(LOG_DEBUG, "EVENT Q\n");
220 Put(InboundEventQueue, IKEY(i), h, NULL);
221 pthread_mutex_unlock(&EventQueueMutex);
223 ev_async_send (event_base, &AddJob);
224 EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
228 int ShutDownEventQueue(void)
230 pthread_mutex_lock(&DBEventQueueMutex);
231 ev_async_send (event_db, &DBExitEventLoop);
232 pthread_mutex_unlock(&DBEventQueueMutex);
234 pthread_mutex_lock(&EventQueueMutex);
235 ev_async_send (EV_DEFAULT_ &ExitEventLoop);
236 pthread_mutex_unlock(&EventQueueMutex);
240 void FreeAsyncIOContents(AsyncIO *IO)
242 FreeStrBuf(&IO->IOBuf);
243 FreeStrBuf(&IO->SendBuf.Buf);
244 FreeStrBuf(&IO->RecvBuf.Buf);
248 void StopClientWatchers(AsyncIO *IO)
250 ev_timer_stop(event_base, &IO->conn_fail);
251 ev_io_stop(event_base, &IO->conn_event);
252 ev_idle_stop(event_base, &IO->unwind_stack);
254 ev_io_stop(event_base, &IO->send_event);
255 ev_io_stop(event_base, &IO->recv_event);
256 ev_timer_stop (event_base, &IO->rw_timeout);
257 close(IO->SendBuf.fd);
262 void ShutDownCLient(AsyncIO *IO)
264 CitContext *Ctx =IO->CitContext;
267 EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
269 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
270 StopClientWatchers(IO);
272 if (IO->DNSChannel != NULL) {
273 ares_destroy(IO->DNSChannel);
274 ev_io_stop(event_base, &IO->dns_recv_event);
275 ev_io_stop(event_base, &IO->dns_send_event);
276 IO->DNSChannel = NULL;
278 assert(IO->Terminate);
280 Ctx->state = CON_IDLE;
285 eReadState HandleInbound(AsyncIO *IO)
287 const char *Err = NULL;
288 eReadState Finished = eBufferNotEmpty;
290 become_session(IO->CitContext);
292 while ((Finished == eBufferNotEmpty) &&
293 ((IO->NextState == eReadMessage)||
294 (IO->NextState == eReadMore)||
295 (IO->NextState == eReadFile)||
296 (IO->NextState == eReadPayload)))
298 if (IO->RecvBuf.nBlobBytesWanted != 0) {
301 else { /* Reading lines... */
302 //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
303 if ((IO->NextState == eReadFile) &&
304 (Finished == eBufferNotEmpty))
306 Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
307 if (Finished == eReadSuccess)
309 IO->NextState = eSendReply;
312 else if (IO->LineReader)
313 Finished = IO->LineReader(IO);
315 Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
318 case eMustReadMore: /// read new from socket...
320 case eBufferNotEmpty: /* shouldn't happen... */
321 case eReadSuccess: /// done for now...
323 case eReadFail: /// WHUT?
330 if (Finished != eMustReadMore) {
331 assert(IO->ReadDone);
332 ev_io_stop(event_base, &IO->recv_event);
333 IO->NextState = IO->ReadDone(IO);
334 Finished = StrBufCheckBuffer(&IO->RecvBuf);
338 switch (IO->NextState) {
340 ev_io_start(event_base, &IO->send_event);
344 assert(IO->SendDone);
345 IO->NextState = IO->SendDone(IO);
346 ev_io_start(event_base, &IO->send_event);
351 ev_io_start(event_base, &IO->recv_event);
353 case eTerminateConnection:
371 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
374 AsyncIO *IO = watcher->data;
375 const char *errmsg = NULL;
377 become_session(IO->CitContext);
383 const char *pch = ChrPtr(IO->SendBuf.Buf);
384 const char *pchh = IO->SendBuf.ReadWritePointer;
390 nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
391 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
392 ((CitContext*)(IO->CitContext))->ServiceName,
395 fd = fopen(fn, "a+");
396 fprintf(fd, "Send: BufSize: %ld BufContent: [",
398 rv = fwrite(pchh, nbytes, 1, fd);
399 if (!rv) printf("failed to write debug to %s!\n", fn);
402 switch (IO->NextState) {
404 rc = FileSendChunked(&IO->IOB, &errmsg);
406 StrBufPlain(IO->ErrMsg, errmsg, -1);
409 rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
413 fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
419 ev_io_stop(event_base, &IO->send_event);
420 switch (IO->NextState) {
422 assert(IO->SendDone);
423 IO->NextState = IO->SendDone(IO);
425 if ((IO->NextState == eTerminateConnection) ||
426 (IO->NextState == eAbort) )
429 ev_io_start(event_base, &IO->send_event);
433 if (IO->IOB.ChunkSendRemain > 0) {
434 ev_io_start(event_base, &IO->recv_event);
436 assert(IO->ReadDone);
437 IO->NextState = IO->ReadDone(IO);
438 switch(IO->NextState) {
447 ev_io_start(event_base, &IO->send_event);
454 case eTerminateConnection:
461 if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
463 IO->NextState = eReadMore;
468 if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
472 ev_io_start(event_base, &IO->recv_event);
477 /* we now live in another queue, so we have to unregister. */
478 ev_cleanup_stop(loop, &IO->abort_by_shutdown);
483 case eTerminateConnection:
492 /* else : must write more. */
495 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
498 switch(IO->NextState) {
502 ev_io_start(event_base, &IO->recv_event);
508 become_session(IO->CitContext);
509 IO_send_callback(loop, &IO->send_event, revents);
515 case eTerminateConnection:
523 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
525 AsyncIO *IO = watcher->data;
527 ev_timer_stop (event_base, &IO->rw_timeout);
528 become_session(IO->CitContext);
530 if (IO->SendBuf.fd != 0)
532 ev_io_stop(event_base, &IO->send_event);
533 ev_io_stop(event_base, &IO->recv_event);
534 ev_timer_stop (event_base, &IO->rw_timeout);
535 close(IO->SendBuf.fd);
536 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
540 switch (IO->Timeout(IO))
550 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
552 AsyncIO *IO = watcher->data;
554 ev_timer_stop (event_base, &IO->conn_fail);
556 if (IO->SendBuf.fd != 0)
558 ev_io_stop(loop, &IO->conn_event);
559 ev_io_stop(event_base, &IO->send_event);
560 ev_io_stop(event_base, &IO->recv_event);
561 ev_timer_stop (event_base, &IO->rw_timeout);
562 close(IO->SendBuf.fd);
563 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
565 become_session(IO->CitContext);
567 assert(IO->ConnFail);
568 switch (IO->ConnFail(IO))
579 IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
581 AsyncIO *IO = watcher->data;
583 ev_idle_stop (event_base, &IO->conn_fail_immediate);
585 if (IO->SendBuf.fd != 0)
587 close(IO->SendBuf.fd);
588 IO->SendBuf.fd = IO->RecvBuf.fd = 0;
590 become_session(IO->CitContext);
592 assert(IO->ConnFail);
593 switch (IO->ConnFail(IO))
604 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
606 AsyncIO *IO = watcher->data;
608 ev_io_stop(loop, &IO->conn_event);
609 ev_timer_stop (event_base, &IO->conn_fail);
610 set_start_callback(loop, IO, revents);
613 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
617 AsyncIO *IO = watcher->data;
619 switch (IO->NextState) {
621 nbytes = FileRecvChunked(&IO->IOB, &errmsg);
623 StrBufPlain(IO->ErrMsg, errmsg, -1);
626 if (IO->IOB.ChunkSendRemain == 0)
628 IO->NextState = eSendReply;
635 nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
644 const char *pch = ChrPtr(IO->RecvBuf.Buf);
645 const char *pchh = IO->RecvBuf.ReadWritePointer;
651 nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
652 snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
653 ((CitContext*)(IO->CitContext))->ServiceName,
656 fd = fopen(fn, "a+");
657 fprintf(fd, "Read: BufSize: %ld BufContent: [",
659 rv = fwrite(pchh, nbytes, 1, fd);
660 if (!rv) printf("failed to write debug to %s!\n", fn);
669 } else if (nbytes == 0) {
672 switch (IO->Timeout(IO))
680 } else if (nbytes == -1) {
681 /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno);
683 "EVENT: Socket Invalid! %s \n",
691 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
693 AsyncIO *IO = watcher->data;
694 EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
695 become_session(IO->CitContext);
697 assert(IO->DNSQuery->PostDNS);
698 switch (IO->DNSQuery->PostDNS(IO))
707 eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
712 IO->SendBuf.fd = IO->RecvBuf.fd =
714 (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
718 if (IO->SendBuf.fd < 0) {
719 EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno));
720 StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno));
723 fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
726 "EVENT: unable to get socket flags! %s \n",
728 StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno));
731 fdflags = fdflags | O_NONBLOCK;
732 if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
734 "EVENT: unable to set socket nonblocking flags! %s \n",
736 StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno));
737 close(IO->SendBuf.fd);
738 IO->SendBuf.fd = IO->RecvBuf.fd = -1;
741 /* TODO: maye we could use offsetof() to calc the position of data...
742 * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
744 ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
745 IO->recv_event.data = IO;
746 ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
747 IO->send_event.data = IO;
749 ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
750 IO->conn_fail.data = IO;
751 ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0);
752 IO->rw_timeout.data = IO;
755 /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
756 /// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1");
757 if (IO->ConnectMe->IPv6)
758 rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6));
760 rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in));
763 EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
764 set_start_callback(event_base, IO, 0);
765 ev_timer_start(event_base, &IO->rw_timeout);
766 return IO->NextState;
768 else if (errno == EINPROGRESS) {
769 EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
771 ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE);
772 IO->conn_event.data = IO;
774 ev_io_start(event_base, &IO->conn_event);
775 ev_timer_start(event_base, &IO->conn_fail);
776 return IO->NextState;
779 ev_idle_init(&IO->conn_fail_immediate,
780 IO_connfailimmediate_callback);
781 IO->conn_fail_immediate.data = IO;
782 ev_idle_start(event_base, &IO->conn_fail_immediate);
784 EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
785 StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno));
786 return IO->NextState;
788 return IO->NextState;
791 void SetNextTimeout(AsyncIO *IO, double timeout)
793 IO->rw_timeout.repeat = timeout;
794 ev_timer_again (event_base, &IO->rw_timeout);
797 eNextState InitEventIO(AsyncIO *IO,
800 double first_rw_timeout,
804 become_session(IO->CitContext);
807 IO->NextState = eReadMessage;
810 IO->NextState = eSendReply;
812 return event_connect_socket(IO, conn_timeout, first_rw_timeout);
815 eNextState ReAttachIO(AsyncIO *IO,
820 become_session(IO->CitContext);
821 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
823 IO->NextState = eReadMessage;
826 IO->NextState = eSendReply;
828 set_start_callback(event_base, IO, 0);
830 return IO->NextState;