2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
28 #include <sys/types.h>
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
36 # include <sys/time.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
54 #include "citserver.h"
57 #include "ctdl_module.h"
59 #include "event_client.h"
60 #include "serv_curl.h"
65 /*****************************************************************************
66 * libevent / curl integration *
67 *****************************************************************************/
70 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \
72 syslog(LOG_ERR, "EVCURL: error setting option " \
73 #s " on curl multi handle: %s\n", \
74 curl_easy_strerror(sta)); \
79 typedef struct _evcurl_global_data {
87 evcurl_global_data global;
98 "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
99 while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
101 "EVCURL: got curl multi_info message msg=%d\n",
104 if (CURLMSG_DONE == msg->msg) {
112 chnd = msg->easy_handle;
113 sta = curl_easy_getinfo(chnd,
116 syslog(LOG_ERR, "EVCURL: request complete\n");
119 "EVCURL: error asking curl for private"
120 " cookie of curl handle: %s\n",
121 curl_easy_strerror(sta));
122 IO = (AsyncIO *)chandle;
124 IO->Now = ev_now(event_base);
126 ev_io_stop(event_base, &IO->recv_event);
127 ev_io_stop(event_base, &IO->send_event);
129 sta = msg->data.result;
132 "EVCURL: error description: %s\n",
133 IO->HttpReq.errdesc);
135 "EVCURL: error performing request: %s\n",
136 curl_easy_strerror(sta));
138 sta = curl_easy_getinfo(chnd,
139 CURLINFO_RESPONSE_CODE,
140 &IO->HttpReq.httpcode);
143 "EVCURL: error asking curl for "
144 "response code from request: %s\n",
145 curl_easy_strerror(sta));
147 "EVCURL: http response code was %ld\n",
148 (long)IO->HttpReq.httpcode);
151 curl_slist_free_all(IO->HttpReq.headers);
152 msta = curl_multi_remove_handle(global.mhnd, chnd);
155 "EVCURL: warning problem detaching "
156 "completed handle from curl multi: "
158 curl_multi_strerror(msta));
160 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
162 IO->HttpReq.attached = 0;
163 switch(IO->SendDone(IO))
166 curl_easy_cleanup(IO->HttpReq.chnd);
167 IO->HttpReq.chnd = NULL;
179 curl_easy_cleanup(IO->HttpReq.chnd);
180 IO->HttpReq.chnd = NULL;
182 case eTerminateConnection:
184 curl_easy_cleanup(IO->HttpReq.chnd);
185 IO->HttpReq.chnd = NULL;
186 FreeStrBuf(&IO->HttpReq.ReplyData);
187 FreeURL(&IO->ConnectMe);
188 RemoveContext(IO->CitContext);
196 stepmulti(void *data, curl_socket_t fd, int which)
198 int running_handles = 0;
201 msta = curl_multi_socket_action(global.mhnd,
206 syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
209 "EVCURL: error in curl processing events"
210 "on multi handle, fd %d: %s\n",
212 curl_multi_strerror(msta));
214 if (global.nrun != running_handles)
215 gotstatus(running_handles);
219 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
221 syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
222 stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
226 got_in(struct ev_loop *loop, ev_io *ioev, int events)
229 "EVCURL: waking up curl for io on fd %d\n",
232 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
236 got_out(struct ev_loop *loop, ev_io *ioev, int events)
239 "EVCURL: waking up curl for io on fd %d\n",
242 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
246 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
247 AsyncIO *IO = (AsyncIO*) cglobal;
249 if (IO->HttpReq.ReplyData == NULL)
251 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
253 IO->Now = ev_now(event_base);
254 return CurlFillStrBuf_callback(data,
257 IO->HttpReq.ReplyData);
261 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
262 syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
263 evcurl_global_data *global = cglobal;
264 ev_timer_stop(EV_DEFAULT, &global->timeev);
265 if (tblock_ms < 0 || 14000 < tblock_ms)
267 ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
268 ev_timer_start(EV_DEFAULT_UC, &global->timeev);
269 curl_multi_perform(global, &global->nrun);
274 gotwatchsock(CURL *easy,
280 evcurl_global_data *global = cglobal;
281 CURLM *mhnd = global->mhnd;
283 AsyncIO *IO = (AsyncIO*) vIO;
288 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
291 "EVCURL: error asking curl for private "
292 "cookie of curl handle: %s\n",
293 curl_easy_strerror(sta));
298 "EVCURL: got socket for URL: %s\n",
299 IO->ConnectMe->PlainUrl);
301 if (IO->SendBuf.fd != 0)
303 ev_io_stop(event_base, &IO->recv_event);
304 ev_io_stop(event_base, &IO->send_event);
307 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
308 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
309 curl_multi_assign(mhnd, fd, IO);
312 IO->Now = ev_now(event_base);
318 Action = "CURL_POLL_NONE";
320 case CURL_POLL_REMOVE:
321 Action = "CURL_POLL_REMOVE";
324 Action = "CURL_POLL_IN";
327 Action = "CURL_POLL_OUT";
329 case CURL_POLL_INOUT:
330 Action = "CURL_POLL_INOUT";
336 "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
337 (int)fd, Action, action);
343 "EVCURL: called first time "
344 "to register this sockwatcker\n");
346 case CURL_POLL_REMOVE:
348 "EVCURL: called last time to unregister "
349 "this sockwatcher\n");
350 ev_io_stop(event_base, &IO->recv_event);
351 ev_io_stop(event_base, &IO->send_event);
354 ev_io_start(event_base, &IO->recv_event);
355 ev_io_stop(event_base, &IO->send_event);
358 ev_io_start(event_base, &IO->send_event);
359 ev_io_stop(event_base, &IO->recv_event);
361 case CURL_POLL_INOUT:
362 ev_io_start(event_base, &IO->send_event);
363 ev_io_start(event_base, &IO->recv_event);
369 void curl_init_connectionpool(void)
373 ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
374 global.timeev.data = (void *)&global;
376 CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
381 "EVCURL: error initializing curl library: %s\n",
382 curl_easy_strerror(sta));
386 mhnd = global.mhnd = curl_multi_init();
390 "EVCURL: error initializing curl multi handle\n");
394 MOPT(SOCKETFUNCTION, &gotwatchsock);
395 MOPT(SOCKETDATA, (void *)&global);
396 MOPT(TIMERFUNCTION, &gotwatchtime);
397 MOPT(TIMERDATA, (void *)&global);
402 int evcurl_init(AsyncIO *IO)
407 EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
408 IO->HttpReq.attached = 0;
409 chnd = IO->HttpReq.chnd = curl_easy_init();
412 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
417 OPT(VERBOSE, (long)1);
422 OPT(FAILONERROR, (long)1);
424 OPT(FOLLOWLOCATION, (long)0);
425 OPT(MAXREDIRS, (long)0);
426 OPT(USERAGENT, CITADEL);
428 OPT(TIMEOUT, (long)1800);
429 OPT(LOW_SPEED_LIMIT, (long)64);
430 OPT(LOW_SPEED_TIME, (long)600);
431 OPT(CONNECTTIMEOUT, (long)600);
432 OPT(PRIVATE, (void *)IO);
434 OPT(FORBID_REUSE, 1);
435 OPT(WRITEFUNCTION, &gotdata);
436 OPT(WRITEDATA, (void *)IO);
437 OPT(ERRORBUFFER, IO->HttpReq.errdesc);
439 if ((!IsEmptyStr(config.c_ip_addr))
440 && (strcmp(config.c_ip_addr, "*"))
441 && (strcmp(config.c_ip_addr, "::"))
442 && (strcmp(config.c_ip_addr, "0.0.0.0"))
445 OPT(INTERFACE, config.c_ip_addr);
448 #ifdef CURLOPT_HTTP_CONTENT_DECODING
449 OPT(HTTP_CONTENT_DECODING, 1);
453 IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
454 "Connection: close");
460 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
465 AsyncIO *IO = watcher->data;
466 IO->Now = ev_now(event_base);
467 EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
469 curl_slist_free_all(IO->HttpReq.headers);
470 msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
474 "EVCURL: warning problem detaching completed handle "
475 "from curl multi: %s\n",
476 curl_multi_strerror(msta));
479 curl_easy_cleanup(IO->HttpReq.chnd);
480 IO->HttpReq.chnd = NULL;
481 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
482 ev_io_stop(event_base, &IO->recv_event);
483 ev_io_stop(event_base, &IO->send_event);
484 assert(IO->ShutdownAbort);
485 IO->ShutdownAbort(IO);
488 evcurl_handle_start(AsyncIO *IO)
494 chnd = IO->HttpReq.chnd;
496 "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
497 OPT(URL, IO->ConnectMe->PlainUrl);
498 if (StrLength(IO->ConnectMe->CurlCreds))
500 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
501 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
503 if (StrLength(IO->HttpReq.PostData) > 0)
505 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
506 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
509 else if ((IO->HttpReq.PlainPostDataLen != 0) &&
510 (IO->HttpReq.PlainPostData != NULL))
512 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
513 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
515 OPT(HTTPHEADER, IO->HttpReq.headers);
517 IO->NextState = eConnect;
518 EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
519 msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
523 "EVCURL: error attaching to curl multi handle: %s\n",
524 curl_multi_strerror(msta));
527 IO->HttpReq.attached = 1;
528 ev_async_send (event_base, &WakeupCurl);
529 ev_cleanup_init(&IO->abort_by_shutdown,
530 IOcurl_abort_shutdown_callback);
532 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
536 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
538 syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
540 curl_multi_perform(&global, CURL_POLL_NONE);
543 static void evcurl_shutdown (void)
545 curl_global_cleanup();
546 curl_multi_cleanup(global.mhnd);
547 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
549 /*****************************************************************************
550 * libevent integration *
551 *****************************************************************************/
553 * client event queue plus its methods.
554 * this currently is the main loop (which may change in some future?)
556 int evbase_count = 0;
557 int event_add_pipe[2] = {-1, -1};
558 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
559 HashList *QueueEvents = NULL;
560 HashList *InboundEventQueue = NULL;
561 HashList *InboundEventQueues[2] = { NULL, NULL };
564 ev_async ExitEventLoop;
566 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
575 /* get the control command... */
576 pthread_mutex_lock(&EventQueueMutex);
578 if (InboundEventQueues[0] == InboundEventQueue) {
579 InboundEventQueue = InboundEventQueues[1];
580 q = InboundEventQueues[0];
583 InboundEventQueue = InboundEventQueues[0];
584 q = InboundEventQueues[1];
586 pthread_mutex_unlock(&EventQueueMutex);
587 Now = ev_now (event_base);
588 It = GetNewHashPos(q, 0);
589 while (GetNextHashPos(q, It, &len, &Key, &v))
592 if (h->IO->ID == 0) {
593 h->IO->ID = EvIDSource++;
595 if (h->IO->StartIO == 0.0)
596 h->IO->StartIO = Now;
601 DeleteHashContent(&q);
602 syslog(LOG_DEBUG, "EVENT Q Add done.\n");
606 static void EventExitCallback(EV_P_ ev_async *w, int revents)
608 ev_break(event_base, EVBREAK_ALL);
610 syslog(LOG_DEBUG, "EVENT Q exiting.\n");
615 void InitEventQueue(void)
617 struct rlimit LimitSet;
619 pthread_mutex_init(&EventQueueMutex, NULL);
621 if (pipe(event_add_pipe) != 0) {
623 "Unable to create pipe for libev queueing: %s\n",
627 LimitSet.rlim_cur = 1;
628 LimitSet.rlim_max = 1;
629 setrlimit(event_add_pipe[1], &LimitSet);
631 QueueEvents = NewHash(1, Flathash);
632 InboundEventQueues[0] = NewHash(1, Flathash);
633 InboundEventQueues[1] = NewHash(1, Flathash);
634 InboundEventQueue = InboundEventQueues[0];
636 extern void CtdlDestroyEVCleanupHooks(void);
638 extern int EVQShutDown;
640 * this thread operates the select() etc. via libev.
642 void *client_event_thread(void *arg)
644 struct CitContext libev_client_CC;
646 CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
647 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
648 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
650 event_base = ev_default_loop (EVFLAG_AUTO);
651 ev_async_init(&AddJob, QueueEventAddCallback);
652 ev_async_start(event_base, &AddJob);
653 ev_async_init(&ExitEventLoop, EventExitCallback);
654 ev_async_start(event_base, &ExitEventLoop);
655 ev_async_init(&WakeupCurl, WakeupCurlCallback);
656 ev_async_start(event_base, &WakeupCurl);
658 curl_init_connectionpool();
660 ev_run (event_base, 0);
662 syslog(LOG_DEBUG, "client_event_thread() exiting\n");
664 ///what todo here? CtdlClearSystemContext();
665 ev_loop_destroy (EV_DEFAULT_UC);
666 DeleteHash(&QueueEvents);
667 InboundEventQueue = NULL;
668 DeleteHash(&InboundEventQueues[0]);
669 DeleteHash(&InboundEventQueues[1]);
670 /* citthread_mutex_destroy(&EventQueueMutex); TODO */
672 close(event_add_pipe[0]);
673 close(event_add_pipe[1]);
675 CtdlDestroyEVCleanupHooks();
681 /*----------------------------------------------------------------------------*/
683 * DB-Queue; does async bdb operations.
684 * has its own set of handlers.
688 int evdb_add_pipe[2] = {-1, -1};
689 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
690 HashList *DBQueueEvents = NULL;
691 HashList *DBInboundEventQueue = NULL;
692 HashList *DBInboundEventQueues[2] = { NULL, NULL };
695 ev_async DBExitEventLoop;
697 extern void ShutDownDBCLient(AsyncIO *IO);
699 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
708 /* get the control command... */
709 pthread_mutex_lock(&DBEventQueueMutex);
711 if (DBInboundEventQueues[0] == DBInboundEventQueue) {
712 DBInboundEventQueue = DBInboundEventQueues[1];
713 q = DBInboundEventQueues[0];
716 DBInboundEventQueue = DBInboundEventQueues[0];
717 q = DBInboundEventQueues[1];
719 pthread_mutex_unlock(&DBEventQueueMutex);
721 Now = ev_now (event_db);
722 It = GetNewHashPos(q, 0);
723 while (GetNextHashPos(q, It, &len, &Key, &v))
728 h->IO->ID = EvIDSource++;
729 if (h->IO->StartDB == 0.0)
730 h->IO->StartDB = Now;
732 rc = h->EvAttch(h->IO);
736 ShutDownDBCLient(h->IO);
742 DeleteHashContent(&q);
743 syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
747 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
749 syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
750 ev_break(event_db, EVBREAK_ALL);
755 void DBInitEventQueue(void)
757 struct rlimit LimitSet;
759 pthread_mutex_init(&DBEventQueueMutex, NULL);
761 if (pipe(evdb_add_pipe) != 0) {
762 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
765 LimitSet.rlim_cur = 1;
766 LimitSet.rlim_max = 1;
767 setrlimit(evdb_add_pipe[1], &LimitSet);
769 DBQueueEvents = NewHash(1, Flathash);
770 DBInboundEventQueues[0] = NewHash(1, Flathash);
771 DBInboundEventQueues[1] = NewHash(1, Flathash);
772 DBInboundEventQueue = DBInboundEventQueues[0];
776 * this thread operates writing to the message database via libev.
778 void *db_event_thread(void *arg)
780 struct CitContext libev_msg_CC;
782 CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
783 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
784 syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
786 event_db = ev_loop_new (EVFLAG_AUTO);
788 ev_async_init(&DBAddJob, DBQueueEventAddCallback);
789 ev_async_start(event_db, &DBAddJob);
790 ev_async_init(&DBExitEventLoop, DBEventExitCallback);
791 ev_async_start(event_db, &DBExitEventLoop);
793 ev_run (event_db, 0);
795 syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
797 //// what to do here? CtdlClearSystemContext();
798 ev_loop_destroy (event_db);
800 DeleteHash(&DBQueueEvents);
801 DBInboundEventQueue = NULL;
802 DeleteHash(&DBInboundEventQueues[0]);
803 DeleteHash(&DBInboundEventQueues[1]);
805 close(evdb_add_pipe[0]);
806 close(evdb_add_pipe[1]);
807 /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
812 void ShutDownEventQueues(void)
814 syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
816 pthread_mutex_lock(&DBEventQueueMutex);
817 ev_async_send (event_db, &DBExitEventLoop);
818 pthread_mutex_unlock(&DBEventQueueMutex);
820 pthread_mutex_lock(&EventQueueMutex);
821 ev_async_send (EV_DEFAULT_ &ExitEventLoop);
822 pthread_mutex_unlock(&EventQueueMutex);
825 CTDL_MODULE_INIT(event_client)
831 CtdlThreadCreate(client_event_thread);
832 CtdlThreadCreate(db_event_thread);