2 * Copyright (c) 1998-2012 by the citadel.org team
4 * This program is open source software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 3.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
28 #include <sys/types.h>
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
36 # include <sys/time.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
54 #include "citserver.h"
57 #include "ctdl_module.h"
59 #include "event_client.h"
60 #include "serv_curl.h"
65 /*****************************************************************************
66 * libevent / curl integration *
67 *****************************************************************************/
70 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \
72 syslog(LOG_ERR, "EVCURL: error setting option " \
73 #s " on curl multi handle: %s\n", \
74 curl_easy_strerror(sta)); \
79 typedef struct _evcurl_global_data {
87 evcurl_global_data global;
98 "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
99 while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
101 "EVCURL: got curl multi_info message msg=%d\n",
104 if (CURLMSG_DONE == msg->msg) {
112 chnd = msg->easy_handle;
113 sta = curl_easy_getinfo(chnd,
116 syslog(LOG_ERR, "EVCURL: request complete\n");
119 "EVCURL: error asking curl for private"
120 " cookie of curl handle: %s\n",
121 curl_easy_strerror(sta));
122 IO = (AsyncIO *)chandle;
124 ev_io_stop(event_base, &IO->recv_event);
125 ev_io_stop(event_base, &IO->send_event);
127 sta = msg->data.result;
130 "EVCURL: error description: %s\n",
131 IO->HttpReq.errdesc);
133 "EVCURL: error performing request: %s\n",
134 curl_easy_strerror(sta));
136 sta = curl_easy_getinfo(chnd,
137 CURLINFO_RESPONSE_CODE,
138 &IO->HttpReq.httpcode);
141 "EVCURL: error asking curl for "
142 "response code from request: %s\n",
143 curl_easy_strerror(sta));
145 "EVCURL: http response code was %ld\n",
146 (long)IO->HttpReq.httpcode);
149 curl_slist_free_all(IO->HttpReq.headers);
150 msta = curl_multi_remove_handle(global.mhnd, chnd);
153 "EVCURL: warning problem detaching "
154 "completed handle from curl multi: "
156 curl_multi_strerror(msta));
158 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
160 IO->HttpReq.attached = 0;
161 switch(IO->SendDone(IO))
164 curl_easy_cleanup(IO->HttpReq.chnd);
165 IO->HttpReq.chnd = NULL;
177 curl_easy_cleanup(IO->HttpReq.chnd);
178 IO->HttpReq.chnd = NULL;
180 case eTerminateConnection:
182 curl_easy_cleanup(IO->HttpReq.chnd);
183 IO->HttpReq.chnd = NULL;
184 FreeStrBuf(&IO->HttpReq.ReplyData);
185 FreeURL(&IO->ConnectMe);
186 RemoveContext(IO->CitContext);
194 stepmulti(void *data, curl_socket_t fd, int which)
196 int running_handles = 0;
199 msta = curl_multi_socket_action(global.mhnd,
204 syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
207 "EVCURL: error in curl processing events"
208 "on multi handle, fd %d: %s\n",
210 curl_multi_strerror(msta));
212 if (global.nrun != running_handles)
213 gotstatus(running_handles);
217 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
219 syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
220 stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
224 got_in(struct ev_loop *loop, ev_io *ioev, int events)
227 "EVCURL: waking up curl for io on fd %d\n",
230 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
234 got_out(struct ev_loop *loop, ev_io *ioev, int events)
237 "EVCURL: waking up curl for io on fd %d\n",
240 stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
244 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
245 AsyncIO *IO = (AsyncIO*) cglobal;
247 if (IO->HttpReq.ReplyData == NULL)
249 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
251 return CurlFillStrBuf_callback(data,
254 IO->HttpReq.ReplyData);
258 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
259 syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
260 evcurl_global_data *global = cglobal;
261 ev_timer_stop(EV_DEFAULT, &global->timeev);
262 if (tblock_ms < 0 || 14000 < tblock_ms)
264 ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
265 ev_timer_start(EV_DEFAULT_UC, &global->timeev);
266 curl_multi_perform(global, &global->nrun);
271 gotwatchsock(CURL *easy,
277 evcurl_global_data *global = cglobal;
278 CURLM *mhnd = global->mhnd;
280 AsyncIO *IO = (AsyncIO*) vIO;
285 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
288 "EVCURL: error asking curl for private "
289 "cookie of curl handle: %s\n",
290 curl_easy_strerror(sta));
295 "EVCURL: got socket for URL: %s\n",
296 IO->ConnectMe->PlainUrl);
298 if (IO->SendBuf.fd != 0)
300 ev_io_stop(event_base, &IO->recv_event);
301 ev_io_stop(event_base, &IO->send_event);
304 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
305 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
306 curl_multi_assign(mhnd, fd, IO);
313 Action = "CURL_POLL_NONE";
315 case CURL_POLL_REMOVE:
316 Action = "CURL_POLL_REMOVE";
319 Action = "CURL_POLL_IN";
322 Action = "CURL_POLL_OUT";
324 case CURL_POLL_INOUT:
325 Action = "CURL_POLL_INOUT";
331 "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
332 (int)fd, Action, action);
338 "EVCURL: called first time "
339 "to register this sockwatcker\n");
341 case CURL_POLL_REMOVE:
343 "EVCURL: called last time to unregister "
344 "this sockwatcher\n");
345 ev_io_stop(event_base, &IO->recv_event);
346 ev_io_stop(event_base, &IO->send_event);
349 ev_io_start(event_base, &IO->recv_event);
350 ev_io_stop(event_base, &IO->send_event);
353 ev_io_start(event_base, &IO->send_event);
354 ev_io_stop(event_base, &IO->recv_event);
356 case CURL_POLL_INOUT:
357 ev_io_start(event_base, &IO->send_event);
358 ev_io_start(event_base, &IO->recv_event);
364 void curl_init_connectionpool(void)
368 ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
369 global.timeev.data = (void *)&global;
371 CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
376 "EVCURL: error initializing curl library: %s\n",
377 curl_easy_strerror(sta));
381 mhnd = global.mhnd = curl_multi_init();
385 "EVCURL: error initializing curl multi handle\n");
389 MOPT(SOCKETFUNCTION, &gotwatchsock);
390 MOPT(SOCKETDATA, (void *)&global);
391 MOPT(TIMERFUNCTION, &gotwatchtime);
392 MOPT(TIMERDATA, (void *)&global);
397 int evcurl_init(AsyncIO *IO)
402 EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
403 IO->HttpReq.attached = 0;
404 chnd = IO->HttpReq.chnd = curl_easy_init();
407 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
412 OPT(VERBOSE, (long)1);
417 OPT(FAILONERROR, (long)1);
419 OPT(FOLLOWLOCATION, (long)0);
420 OPT(MAXREDIRS, (long)0);
421 OPT(USERAGENT, CITADEL);
423 OPT(TIMEOUT, (long)1800);
424 OPT(LOW_SPEED_LIMIT, (long)64);
425 OPT(LOW_SPEED_TIME, (long)600);
426 OPT(CONNECTTIMEOUT, (long)600);
427 OPT(PRIVATE, (void *)IO);
429 OPT(FORBID_REUSE, 1);
430 OPT(WRITEFUNCTION, &gotdata);
431 OPT(WRITEDATA, (void *)IO);
432 OPT(ERRORBUFFER, IO->HttpReq.errdesc);
434 if ((!IsEmptyStr(config.c_ip_addr))
435 && (strcmp(config.c_ip_addr, "*"))
436 && (strcmp(config.c_ip_addr, "::"))
437 && (strcmp(config.c_ip_addr, "0.0.0.0"))
440 OPT(INTERFACE, config.c_ip_addr);
443 #ifdef CURLOPT_HTTP_CONTENT_DECODING
444 OPT(HTTP_CONTENT_DECODING, 1);
448 IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
449 "Connection: close");
455 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
460 AsyncIO *IO = watcher->data;
461 EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
463 curl_slist_free_all(IO->HttpReq.headers);
464 msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
468 "EVCURL: warning problem detaching completed handle "
469 "from curl multi: %s\n",
470 curl_multi_strerror(msta));
473 curl_easy_cleanup(IO->HttpReq.chnd);
474 IO->HttpReq.chnd = NULL;
475 ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
476 ev_io_stop(event_base, &IO->recv_event);
477 ev_io_stop(event_base, &IO->send_event);
478 assert(IO->ShutdownAbort);
479 IO->ShutdownAbort(IO);
482 evcurl_handle_start(AsyncIO *IO)
488 chnd = IO->HttpReq.chnd;
490 "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
491 OPT(URL, IO->ConnectMe->PlainUrl);
492 if (StrLength(IO->ConnectMe->CurlCreds))
494 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
495 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
497 if (StrLength(IO->HttpReq.PostData) > 0)
499 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
500 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
503 else if ((IO->HttpReq.PlainPostDataLen != 0) &&
504 (IO->HttpReq.PlainPostData != NULL))
506 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
507 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
509 OPT(HTTPHEADER, IO->HttpReq.headers);
511 IO->NextState = eConnect;
512 EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
513 msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
517 "EVCURL: error attaching to curl multi handle: %s\n",
518 curl_multi_strerror(msta));
521 IO->HttpReq.attached = 1;
522 ev_async_send (event_base, &WakeupCurl);
523 ev_cleanup_init(&IO->abort_by_shutdown,
524 IOcurl_abort_shutdown_callback);
526 ev_cleanup_start(event_base, &IO->abort_by_shutdown);
530 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
532 syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
534 curl_multi_perform(&global, CURL_POLL_NONE);
537 static void evcurl_shutdown (void)
539 curl_global_cleanup();
540 curl_multi_cleanup(global.mhnd);
541 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
543 /*****************************************************************************
544 * libevent integration *
545 *****************************************************************************/
547 * client event queue plus its methods.
548 * this currently is the main loop (which may change in some future?)
550 int evbase_count = 0;
551 int event_add_pipe[2] = {-1, -1};
552 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
553 HashList *QueueEvents = NULL;
554 HashList *InboundEventQueue = NULL;
555 HashList *InboundEventQueues[2] = { NULL, NULL };
558 ev_async ExitEventLoop;
560 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
568 /* get the control command... */
569 pthread_mutex_lock(&EventQueueMutex);
571 if (InboundEventQueues[0] == InboundEventQueue) {
572 InboundEventQueue = InboundEventQueues[1];
573 q = InboundEventQueues[0];
576 InboundEventQueue = InboundEventQueues[0];
577 q = InboundEventQueues[1];
579 pthread_mutex_unlock(&EventQueueMutex);
581 It = GetNewHashPos(q, 0);
582 while (GetNextHashPos(q, It, &len, &Key, &v))
586 h->IO->ID = EvIDSource++;
590 DeleteHashContent(&q);
591 syslog(LOG_DEBUG, "EVENT Q Add done.\n");
595 static void EventExitCallback(EV_P_ ev_async *w, int revents)
597 ev_break(event_base, EVBREAK_ALL);
599 syslog(LOG_DEBUG, "EVENT Q exiting.\n");
604 void InitEventQueue(void)
606 struct rlimit LimitSet;
608 pthread_mutex_init(&EventQueueMutex, NULL);
610 if (pipe(event_add_pipe) != 0) {
612 "Unable to create pipe for libev queueing: %s\n",
616 LimitSet.rlim_cur = 1;
617 LimitSet.rlim_max = 1;
618 setrlimit(event_add_pipe[1], &LimitSet);
620 QueueEvents = NewHash(1, Flathash);
621 InboundEventQueues[0] = NewHash(1, Flathash);
622 InboundEventQueues[1] = NewHash(1, Flathash);
623 InboundEventQueue = InboundEventQueues[0];
627 * this thread operates the select() etc. via libev.
629 void *client_event_thread(void *arg)
631 struct CitContext libev_client_CC;
633 CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
634 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
635 syslog(LOG_DEBUG, "client_event_thread() initializing\n");
637 event_base = ev_default_loop (EVFLAG_AUTO);
638 ev_async_init(&AddJob, QueueEventAddCallback);
639 ev_async_start(event_base, &AddJob);
640 ev_async_init(&ExitEventLoop, EventExitCallback);
641 ev_async_start(event_base, &ExitEventLoop);
642 ev_async_init(&WakeupCurl, WakeupCurlCallback);
643 ev_async_start(event_base, &WakeupCurl);
645 curl_init_connectionpool();
647 ev_run (event_base, 0);
649 syslog(LOG_DEBUG, "client_event_thread() exiting\n");
651 ///what todo here? CtdlClearSystemContext();
652 ev_loop_destroy (EV_DEFAULT_UC);
653 DeleteHash(&QueueEvents);
654 InboundEventQueue = NULL;
655 DeleteHash(&InboundEventQueues[0]);
656 DeleteHash(&InboundEventQueues[1]);
657 /* citthread_mutex_destroy(&EventQueueMutex); TODO */
659 close(event_add_pipe[0]);
660 close(event_add_pipe[1]);
665 /*----------------------------------------------------------------------------*/
667 * DB-Queue; does async bdb operations.
668 * has its own set of handlers.
672 int evdb_add_pipe[2] = {-1, -1};
673 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
674 HashList *DBQueueEvents = NULL;
675 HashList *DBInboundEventQueue = NULL;
676 HashList *DBInboundEventQueues[2] = { NULL, NULL };
679 ev_async DBExitEventLoop;
681 extern void ShutDownDBCLient(AsyncIO *IO);
683 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
691 /* get the control command... */
692 pthread_mutex_lock(&DBEventQueueMutex);
694 if (DBInboundEventQueues[0] == DBInboundEventQueue) {
695 DBInboundEventQueue = DBInboundEventQueues[1];
696 q = DBInboundEventQueues[0];
699 DBInboundEventQueue = DBInboundEventQueues[0];
700 q = DBInboundEventQueues[1];
702 pthread_mutex_unlock(&DBEventQueueMutex);
704 It = GetNewHashPos(q, 0);
705 while (GetNextHashPos(q, It, &len, &Key, &v))
710 h->IO->ID = EvIDSource++;
711 rc = h->EvAttch(h->IO);
715 ShutDownDBCLient(h->IO);
721 DeleteHashContent(&q);
722 syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
726 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
728 syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
729 ev_break(event_db, EVBREAK_ALL);
734 void DBInitEventQueue(void)
736 struct rlimit LimitSet;
738 pthread_mutex_init(&DBEventQueueMutex, NULL);
740 if (pipe(evdb_add_pipe) != 0) {
741 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
744 LimitSet.rlim_cur = 1;
745 LimitSet.rlim_max = 1;
746 setrlimit(evdb_add_pipe[1], &LimitSet);
748 DBQueueEvents = NewHash(1, Flathash);
749 DBInboundEventQueues[0] = NewHash(1, Flathash);
750 DBInboundEventQueues[1] = NewHash(1, Flathash);
751 DBInboundEventQueue = DBInboundEventQueues[0];
755 * this thread operates writing to the message database via libev.
757 void *db_event_thread(void *arg)
759 struct CitContext libev_msg_CC;
761 CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
762 // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
763 syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
765 event_db = ev_loop_new (EVFLAG_AUTO);
767 ev_async_init(&DBAddJob, DBQueueEventAddCallback);
768 ev_async_start(event_db, &DBAddJob);
769 ev_async_init(&DBExitEventLoop, DBEventExitCallback);
770 ev_async_start(event_db, &DBExitEventLoop);
772 ev_run (event_db, 0);
774 syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
776 //// what to do here? CtdlClearSystemContext();
777 ev_loop_destroy (event_db);
779 DeleteHash(&DBQueueEvents);
780 DBInboundEventQueue = NULL;
781 DeleteHash(&DBInboundEventQueues[0]);
782 DeleteHash(&DBInboundEventQueues[1]);
784 close(evdb_add_pipe[0]);
785 close(evdb_add_pipe[1]);
786 /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
791 void ShutDownEventQueues(void)
793 syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
795 pthread_mutex_lock(&DBEventQueueMutex);
796 ev_async_send (event_db, &DBExitEventLoop);
797 pthread_mutex_unlock(&DBEventQueueMutex);
799 pthread_mutex_lock(&EventQueueMutex);
800 ev_async_send (EV_DEFAULT_ &ExitEventLoop);
801 pthread_mutex_unlock(&EventQueueMutex);
804 CTDL_MODULE_INIT(event_client)
808 CtdlRegisterCleanupHook(ShutDownEventQueues);
811 CtdlThreadCreate(client_event_thread);
812 CtdlThreadCreate(db_event_thread);