2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
28 #include <sys/types.h>
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
36 # include <sys/time.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
54 #include "citserver.h"
57 #include "ctdl_module.h"
59 #include "event_client.h"
60 #include "serv_curl.h"
63 int DebugEventLoop = 0;
67 /*****************************************************************************
68 * libevent / curl integration *
69 *****************************************************************************/
72 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \
74 syslog(LOG_ERR, "EVCURL: error setting option " \
75 #s " on curl multi handle: %s\n", \
76 curl_easy_strerror(sta)); \
81 typedef struct _evcurl_global_data {
89 evcurl_global_data global;
100 "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
101 while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
103 "EVCURL: got curl multi_info message msg=%d\n",
106 if (CURLMSG_DONE == msg->msg) {
114 chnd = msg->easy_handle;
115 sta = curl_easy_getinfo(chnd,
118 syslog(LOG_ERR, "EVCURL: request complete\n");
121 "EVCURL: error asking curl for private"
122 " cookie of curl handle: %s\n",
123 curl_easy_strerror(sta));
124 IO = (AsyncIO *)chandle;
126 IO->Now = ev_now(event_base);
128 ev_io_stop(event_base, &IO->recv_event);
129 ev_io_stop(event_base, &IO->send_event);
131 sta = msg->data.result;
134 "EVCURL: error description: %s\n",
135 IO->HttpReq.errdesc);
137 "EVCURL: error performing request: %s\n",
138 curl_easy_strerror(sta));
140 sta = curl_easy_getinfo(chnd,
141 CURLINFO_RESPONSE_CODE,
142 &IO->HttpReq.httpcode);
145 "EVCURL: error asking curl for "
146 "response code from request: %s\n",
147 curl_easy_strerror(sta));
149 "EVCURL: http response code was %ld\n",
150 (long)IO->HttpReq.httpcode);
153 curl_slist_free_all(IO->HttpReq.headers);
154 msta = curl_multi_remove_handle(global.mhnd, chnd);
157 "EVCURL: warning problem detaching "
158 "completed handle from curl multi: "
160 curl_multi_strerror(msta));
162 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
164 IO->HttpReq.attached = 0;
165 switch(IO->SendDone(IO))
168 curl_easy_cleanup(IO->HttpReq.chnd);
169 IO->HttpReq.chnd = NULL;
181 curl_easy_cleanup(IO->HttpReq.chnd);
182 IO->HttpReq.chnd = NULL;
184 case eTerminateConnection:
186 curl_easy_cleanup(IO->HttpReq.chnd);
187 IO->HttpReq.chnd = NULL;
188 FreeStrBuf(&IO->HttpReq.ReplyData);
189 FreeURL(&IO->ConnectMe);
190 RemoveContext(IO->CitContext);
198 stepmulti(void *data, curl_socket_t fd, int which)
200 int running_handles = 0;
203 msta = curl_multi_socket_action(global.mhnd,
208 syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
211 "EVCURL: error in curl processing events"
212 "on multi handle, fd %d: %s\n",
214 curl_multi_strerror(msta));
216 if (global.nrun != running_handles)
217 gotstatus(running_handles);
221 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
223 syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
224 stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
228 got_in(struct ev_loop *loop, ev_io *ioev, int events)
231 "EVCURL: waking up curl for io on fd %d\n",
234 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
238 got_out(struct ev_loop *loop, ev_io *ioev, int events)
241 "EVCURL: waking up curl for io on fd %d\n",
244 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
248 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
249 AsyncIO *IO = (AsyncIO*) cglobal;
251 if (IO->HttpReq.ReplyData == NULL)
253 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
255 IO->Now = ev_now(event_base);
256 return CurlFillStrBuf_callback(data,
259 IO->HttpReq.ReplyData);
263 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
264 syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
265 evcurl_global_data *global = cglobal;
266 ev_timer_stop(EV_DEFAULT, &global->timeev);
267 if (tblock_ms < 0 || 14000 < tblock_ms)
269 ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
270 ev_timer_start(EV_DEFAULT_UC, &global->timeev);
271 curl_multi_perform(global, &global->nrun);
276 gotwatchsock(CURL *easy,
282 evcurl_global_data *global = cglobal;
283 CURLM *mhnd = global->mhnd;
285 AsyncIO *IO = (AsyncIO*) vIO;
290 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
293 "EVCURL: error asking curl for private "
294 "cookie of curl handle: %s\n",
295 curl_easy_strerror(sta));
300 "EVCURL: got socket for URL: %s\n",
301 IO->ConnectMe->PlainUrl);
303 if (IO->SendBuf.fd != 0)
305 ev_io_stop(event_base, &IO->recv_event);
306 ev_io_stop(event_base, &IO->send_event);
309 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
310 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
311 curl_multi_assign(mhnd, fd, IO);
314 IO->Now = ev_now(event_base);
320 Action = "CURL_POLL_NONE";
322 case CURL_POLL_REMOVE:
323 Action = "CURL_POLL_REMOVE";
326 Action = "CURL_POLL_IN";
329 Action = "CURL_POLL_OUT";
331 case CURL_POLL_INOUT:
332 Action = "CURL_POLL_INOUT";
338 "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
339 (int)fd, Action, action);
345 "EVCURL: called first time "
346 "to register this sockwatcker\n");
348 case CURL_POLL_REMOVE:
350 "EVCURL: called last time to unregister "
351 "this sockwatcher\n");
352 ev_io_stop(event_base, &IO->recv_event);
353 ev_io_stop(event_base, &IO->send_event);
356 ev_io_start(event_base, &IO->recv_event);
357 ev_io_stop(event_base, &IO->send_event);
360 ev_io_start(event_base, &IO->send_event);
361 ev_io_stop(event_base, &IO->recv_event);
363 case CURL_POLL_INOUT:
364 ev_io_start(event_base, &IO->send_event);
365 ev_io_start(event_base, &IO->recv_event);
371 void curl_init_connectionpool(void)
375 ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
376 global.timeev.data = (void *)&global;
378 CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
383 "EVCURL: error initializing curl library: %s\n",
384 curl_easy_strerror(sta));
388 mhnd = global.mhnd = curl_multi_init();
392 "EVCURL: error initializing curl multi handle\n");
396 MOPT(SOCKETFUNCTION, &gotwatchsock);
397 MOPT(SOCKETDATA, (void *)&global);
398 MOPT(TIMERFUNCTION, &gotwatchtime);
399 MOPT(TIMERDATA, (void *)&global);
404 int evcurl_init(AsyncIO *IO)
409 EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
410 IO->HttpReq.attached = 0;
411 chnd = IO->HttpReq.chnd = curl_easy_init();
414 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
419 OPT(VERBOSE, (long)1);
424 OPT(FAILONERROR, (long)1);
426 OPT(FOLLOWLOCATION, (long)0);
427 OPT(MAXREDIRS, (long)0);
428 OPT(USERAGENT, CITADEL);
430 OPT(TIMEOUT, (long)1800);
431 OPT(LOW_SPEED_LIMIT, (long)64);
432 OPT(LOW_SPEED_TIME, (long)600);
433 OPT(CONNECTTIMEOUT, (long)600);
434 OPT(PRIVATE, (void *)IO);
436 OPT(FORBID_REUSE, 1);
437 OPT(WRITEFUNCTION, &gotdata);
438 OPT(WRITEDATA, (void *)IO);
439 OPT(ERRORBUFFER, IO->HttpReq.errdesc);
441 if ((!IsEmptyStr(config.c_ip_addr))
442 && (strcmp(config.c_ip_addr, "*"))
443 && (strcmp(config.c_ip_addr, "::"))
444 && (strcmp(config.c_ip_addr, "0.0.0.0"))
447 OPT(INTERFACE, config.c_ip_addr);
450 #ifdef CURLOPT_HTTP_CONTENT_DECODING
451 OPT(HTTP_CONTENT_DECODING, 1);
455 IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
456 "Connection: close");
462 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
467 AsyncIO *IO = watcher->data;
468 IO->Now = ev_now(event_base);
469 EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
471 curl_slist_free_all(IO->HttpReq.headers);
472 msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
476 "EVCURL: warning problem detaching completed handle "
477 "from curl multi: %s\n",
478 curl_multi_strerror(msta));
481 curl_easy_cleanup(IO->HttpReq.chnd);
482 IO->HttpReq.chnd = NULL;
483 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
484 ev_io_stop(event_base, &IO->recv_event);
485 ev_io_stop(event_base, &IO->send_event);
486 assert(IO->ShutdownAbort);
487 IO->ShutdownAbort(IO);
490 evcurl_handle_start(AsyncIO *IO)
496 chnd = IO->HttpReq.chnd;
498 "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
499 OPT(URL, IO->ConnectMe->PlainUrl);
500 if (StrLength(IO->ConnectMe->CurlCreds))
502 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
503 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
505 if (StrLength(IO->HttpReq.PostData) > 0)
507 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
508 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
511 else if ((IO->HttpReq.PlainPostDataLen != 0) &&
512 (IO->HttpReq.PlainPostData != NULL))
514 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
515 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
517 OPT(HTTPHEADER, IO->HttpReq.headers);
519 IO->NextState = eConnect;
520 EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
521 msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
525 "EVCURL: error attaching to curl multi handle: %s\n",
526 curl_multi_strerror(msta));
529 IO->HttpReq.attached = 1;
530 ev_async_send (event_base, &WakeupCurl);
531 ev_cleanup_init(&IO->abort_by_shutdown,
532 IOcurl_abort_shutdown_callback);
534 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
538 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
540 syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
542 curl_multi_perform(&global, CURL_POLL_NONE);
545 static void evcurl_shutdown (void)
547 curl_global_cleanup();
548 curl_multi_cleanup(global.mhnd);
549 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
551 /*****************************************************************************
552 * libevent integration *
553 *****************************************************************************/
555 * client event queue plus its methods.
556 * this currently is the main loop (which may change in some future?)
558 int evbase_count = 0;
559 int event_add_pipe[2] = {-1, -1};
560 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
561 HashList *QueueEvents = NULL;
562 HashList *InboundEventQueue = NULL;
563 HashList *InboundEventQueues[2] = { NULL, NULL };
566 ev_async ExitEventLoop;
568 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
577 /* get the control command... */
578 pthread_mutex_lock(&EventQueueMutex);
580 if (InboundEventQueues[0] == InboundEventQueue) {
581 InboundEventQueue = InboundEventQueues[1];
582 q = InboundEventQueues[0];
585 InboundEventQueue = InboundEventQueues[0];
586 q = InboundEventQueues[1];
588 pthread_mutex_unlock(&EventQueueMutex);
589 Now = ev_now (event_base);
590 It = GetNewHashPos(q, 0);
591 while (GetNextHashPos(q, It, &len, &Key, &v))
594 if (h->IO->ID == 0) {
595 h->IO->ID = EvIDSource++;
597 if (h->IO->StartIO == 0.0)
598 h->IO->StartIO = Now;
603 DeleteHashContent(&q);
604 syslog(LOG_DEBUG, "EVENT Q Add done.\n");
608 static void EventExitCallback(EV_P_ ev_async *w, int revents)
610 ev_break(event_base, EVBREAK_ALL);
612 syslog(LOG_DEBUG, "EVENT Q exiting.\n");
617 void InitEventQueue(void)
619 struct rlimit LimitSet;
621 pthread_mutex_init(&EventQueueMutex, NULL);
623 if (pipe(event_add_pipe) != 0) {
625 "Unable to create pipe for libev queueing: %s\n",
629 LimitSet.rlim_cur = 1;
630 LimitSet.rlim_max = 1;
631 setrlimit(event_add_pipe[1], &LimitSet);
633 QueueEvents = NewHash(1, Flathash);
634 InboundEventQueues[0] = NewHash(1, Flathash);
635 InboundEventQueues[1] = NewHash(1, Flathash);
636 InboundEventQueue = InboundEventQueues[0];
638 extern void CtdlDestroyEVCleanupHooks(void);
640 extern int EVQShutDown;
642 * this thread operates the select() etc. via libev.
644 void *client_event_thread(void *arg)
646 struct CitContext libev_client_CC;
648 CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
649 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
650 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
652 event_base = ev_default_loop (EVFLAG_AUTO);
653 ev_async_init(&AddJob, QueueEventAddCallback);
654 ev_async_start(event_base, &AddJob);
655 ev_async_init(&ExitEventLoop, EventExitCallback);
656 ev_async_start(event_base, &ExitEventLoop);
657 ev_async_init(&WakeupCurl, WakeupCurlCallback);
658 ev_async_start(event_base, &WakeupCurl);
660 curl_init_connectionpool();
662 ev_run (event_base, 0);
664 syslog(LOG_DEBUG, "client_event_thread() exiting\n");
666 ///what todo here? CtdlClearSystemContext();
667 ev_loop_destroy (EV_DEFAULT_UC);
668 DeleteHash(&QueueEvents);
669 InboundEventQueue = NULL;
670 DeleteHash(&InboundEventQueues[0]);
671 DeleteHash(&InboundEventQueues[1]);
672 /* citthread_mutex_destroy(&EventQueueMutex); TODO */
674 close(event_add_pipe[0]);
675 close(event_add_pipe[1]);
677 CtdlDestroyEVCleanupHooks();
683 /*----------------------------------------------------------------------------*/
685 * DB-Queue; does async bdb operations.
686 * has its own set of handlers.
690 int evdb_add_pipe[2] = {-1, -1};
691 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
692 HashList *DBQueueEvents = NULL;
693 HashList *DBInboundEventQueue = NULL;
694 HashList *DBInboundEventQueues[2] = { NULL, NULL };
697 ev_async DBExitEventLoop;
699 extern void ShutDownDBCLient(AsyncIO *IO);
701 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
710 /* get the control command... */
711 pthread_mutex_lock(&DBEventQueueMutex);
713 if (DBInboundEventQueues[0] == DBInboundEventQueue) {
714 DBInboundEventQueue = DBInboundEventQueues[1];
715 q = DBInboundEventQueues[0];
718 DBInboundEventQueue = DBInboundEventQueues[0];
719 q = DBInboundEventQueues[1];
721 pthread_mutex_unlock(&DBEventQueueMutex);
723 Now = ev_now (event_db);
724 It = GetNewHashPos(q, 0);
725 while (GetNextHashPos(q, It, &len, &Key, &v))
730 h->IO->ID = EvIDSource++;
731 if (h->IO->StartDB == 0.0)
732 h->IO->StartDB = Now;
734 rc = h->EvAttch(h->IO);
738 ShutDownDBCLient(h->IO);
744 DeleteHashContent(&q);
745 syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
749 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
751 syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
752 ev_break(event_db, EVBREAK_ALL);
757 void DBInitEventQueue(void)
759 struct rlimit LimitSet;
761 pthread_mutex_init(&DBEventQueueMutex, NULL);
763 if (pipe(evdb_add_pipe) != 0) {
764 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
767 LimitSet.rlim_cur = 1;
768 LimitSet.rlim_max = 1;
769 setrlimit(evdb_add_pipe[1], &LimitSet);
771 DBQueueEvents = NewHash(1, Flathash);
772 DBInboundEventQueues[0] = NewHash(1, Flathash);
773 DBInboundEventQueues[1] = NewHash(1, Flathash);
774 DBInboundEventQueue = DBInboundEventQueues[0];
778 * this thread operates writing to the message database via libev.
780 void *db_event_thread(void *arg)
782 struct CitContext libev_msg_CC;
784 CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
785 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
786 syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
788 event_db = ev_loop_new (EVFLAG_AUTO);
790 ev_async_init(&DBAddJob, DBQueueEventAddCallback);
791 ev_async_start(event_db, &DBAddJob);
792 ev_async_init(&DBExitEventLoop, DBEventExitCallback);
793 ev_async_start(event_db, &DBExitEventLoop);
795 ev_run (event_db, 0);
797 syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
799 //// what to do here? CtdlClearSystemContext();
800 ev_loop_destroy (event_db);
802 DeleteHash(&DBQueueEvents);
803 DBInboundEventQueue = NULL;
804 DeleteHash(&DBInboundEventQueues[0]);
805 DeleteHash(&DBInboundEventQueues[1]);
807 close(evdb_add_pipe[0]);
808 close(evdb_add_pipe[1]);
809 /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
814 void ShutDownEventQueues(void)
816 syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
818 pthread_mutex_lock(&DBEventQueueMutex);
819 ev_async_send (event_db, &DBExitEventLoop);
820 pthread_mutex_unlock(&DBEventQueueMutex);
822 pthread_mutex_lock(&EventQueueMutex);
823 ev_async_send (EV_DEFAULT_ &ExitEventLoop);
824 pthread_mutex_unlock(&EventQueueMutex);
827 void DebugEventloopEnable(void)
832 void DebugCurlEnable(void)
837 CTDL_MODULE_INIT(event_client)
841 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable);
842 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable);
845 CtdlThreadCreate(client_event_thread);
846 CtdlThreadCreate(db_event_thread);