X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Feventclient%2Fserv_eventclient.c;h=39bb108a7af1dce3e03fc2ef65f8fe39e1a5eb70;hb=ef347e598ab670b87e178af7fc6b00795494303a;hp=53cbea4bd95e440734e2935fe84873ff8c438a3c;hpb=89e1a0f46082894132a09bd7e6401ee0ef1b013c;p=citadel.git diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 53cbea4bd..39bb108a7 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -1,19 +1,19 @@ /* - * Copyright (c) 1998-2009 by the citadel.org team + * Copyright (c) 1998-2012 by the citadel.org team * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. + * This program is open source software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 3. + * + * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + * */ #include "sysdep.h" @@ -60,79 +60,125 @@ #include "serv_curl.h" ev_loop *event_base; +int DebugEventLoop = 0; +int DebugCurl = 0; long EvIDSource = 1; /***************************************************************************** * libevent / curl integration * *****************************************************************************/ -#define MOPT(s, v) \ - do { \ - sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ - if (sta) { \ - syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \ - exit (1); \ - } \ - } while (0) +#define MOPT(s, v)\ + do { \ + sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ + if (sta) { \ + syslog(LOG_ERR, "EVCURL: error setting option " \ + #s " on curl multi handle: %s\n", \ + curl_easy_strerror(sta)); \ + exit (1); \ + } \ + } while (0) + +#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) + +#define EVCURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID, __VA_ARGS__) + +#define EVCURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID) + +#define CURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) + +#define CURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT) typedef struct _evcurl_global_data { - int magic; - CURLM *mhnd; - ev_timer timeev; - int nrun; + int magic; + CURLM *mhnd; + ev_timer timeev; + int nrun; } evcurl_global_data; ev_async WakeupCurl; evcurl_global_data global; static void -gotstatus(int nnrun) +gotstatus(int nnrun) { - CURLMsg *msg; - int nmsg; - - global.nrun = nnrun; - - syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n"); - while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) { - syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg); - if (CURLMSG_DONE == msg->msg) { - CURL *chnd; - char *chandle; - CURLcode sta; - CURLMcode msta; - AsyncIO *IO; - - chandle = NULL;; - chnd = msg->easy_handle; - sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle); - syslog(LOG_ERR, "EVCURL: request complete\n"); - if (sta) - syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); - IO = (AsyncIO *)chandle; - - ev_io_stop(event_base, &IO->recv_event); - ev_io_stop(event_base, &IO->send_event); - - sta = msg->data.result; - if (sta) { - EV_syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc); - EV_syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta)); - } - sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode); - if (sta) - EV_syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta)); - EV_syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode); - - - curl_slist_free_all(IO->HttpReq.headers); - msta = curl_multi_remove_handle(global.mhnd, chnd); - if (msta) - EV_syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta)); + CURLMsg *msg; + int nmsg; + + global.nrun = nnrun; + + CURLM_syslog(LOG_DEBUG, + "gotstatus(): about to call curl_multi_info_read\n"); + while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) { + CURL_syslog(LOG_DEBUG, + "got curl multi_info message msg=%d\n", + msg->msg); + + if (CURLMSG_DONE == msg->msg) { + CURL *chnd; + char *chandle = NULL; + CURLcode sta; + CURLMcode msta; + AsyncIO*IO; + + chandle = NULL;; + chnd = msg->easy_handle; + sta = curl_easy_getinfo(chnd, + CURLINFO_PRIVATE, + &chandle); + EVCURLM_syslog(LOG_DEBUG, "request complete\n"); + if (sta) { + EVCURL_syslog(LOG_ERR, + "error asking curl for private" + " cookie of curl handle: %s\n", + curl_easy_strerror(sta)); + continue; + } + IO = (AsyncIO *)chandle; + + IO->Now = ev_now(event_base); + + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + + sta = msg->data.result; + if (sta) { + EVCURL_syslog(LOG_ERR, + "error description: %s\n", + IO->HttpReq.errdesc); + EVCURL_syslog(LOG_ERR, + "error performing request: %s\n", + curl_easy_strerror(sta)); + } + sta = curl_easy_getinfo(chnd, + CURLINFO_RESPONSE_CODE, + &IO->HttpReq.httpcode); + if (sta) + EVCURL_syslog(LOG_ERR, + "error asking curl for " + "response code from request: %s\n", + curl_easy_strerror(sta)); + EVCURL_syslog(LOG_ERR, + "http response code was %ld\n", + (long)IO->HttpReq.httpcode); + + + curl_slist_free_all(IO->HttpReq.headers); + msta = curl_multi_remove_handle(global.mhnd, chnd); + if (msta) + EVCURL_syslog(LOG_ERR, + "warning problem detaching " + "completed handle from curl multi: " + "%s\n", + curl_multi_strerror(msta)); ev_cleanup_stop(event_base, &IO->abort_by_shutdown); - IO->HttpReq.attached = 0; - switch(IO->SendDone(IO)) + IO->HttpReq.attached = 0; + switch(IO->SendDone(IO)) { case eDBQuery: curl_easy_cleanup(IO->HttpReq.chnd); @@ -141,10 +187,10 @@ gotstatus(int nnrun) case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: + case eSendReply: case eSendMore: case eSendFile: - case eReadMessage: + case eReadMessage: case eReadMore: case eReadPayload: case eReadFile: @@ -160,83 +206,116 @@ gotstatus(int nnrun) RemoveContext(IO->CitContext); IO->Terminate(IO); } - } - } + } + } } static void stepmulti(void *data, curl_socket_t fd, int which) { - int running_handles = 0; - CURLMcode msta; - - msta = curl_multi_socket_action(global.mhnd, fd, which, &running_handles); - syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n"); - if (msta) - syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta)); - if (global.nrun != running_handles) - gotstatus(running_handles); + int running_handles = 0; + CURLMcode msta; + + msta = curl_multi_socket_action(global.mhnd, + fd, + which, + &running_handles); + + CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n"); + if (msta) + CURL_syslog(LOG_ERR, + "error in curl processing events" + "on multi handle, fd %d: %s\n", + (int)fd, + curl_multi_strerror(msta)); + + if (global.nrun != running_handles) + gotstatus(running_handles); } static void -gottime(struct ev_loop *loop, ev_timer *timeev, int events) { - syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); - stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0); +gottime(struct ev_loop *loop, ev_timer *timeev, int events) +{ + CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); + stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0); } static void -got_in(struct ev_loop *loop, ev_io *ioev, int events) { - syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); - stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN); +got_in(struct ev_loop *loop, ev_io *ioev, int events) +{ + CURL_syslog(LOG_DEBUG, + "EVCURL: waking up curl for io on fd %d\n", + (int)ioev->fd); + + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN); } static void -got_out(struct ev_loop *loop, ev_io *ioev, int events) { - syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); - stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT); +got_out(struct ev_loop *loop, ev_io *ioev, int events) +{ + CURL_syslog(LOG_DEBUG, + "waking up curl for io on fd %d\n", + (int)ioev->fd); + + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT); } static size_t gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { - AsyncIO *IO = (AsyncIO*) cglobal; + AsyncIO *IO = (AsyncIO*) cglobal; - if (IO->HttpReq.ReplyData == NULL) - { - IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); - } - return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData); + if (IO->HttpReq.ReplyData == NULL) + { + IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); + } + IO->Now = ev_now(event_base); + return CurlFillStrBuf_callback(data, + size, + nmemb, + IO->HttpReq.ReplyData); } static int gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) { - syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); - evcurl_global_data *global = cglobal; - ev_timer_stop(EV_DEFAULT, &global->timeev); - if (tblock_ms < 0 || 14000 < tblock_ms) - tblock_ms = 14000; - ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0); - ev_timer_start(EV_DEFAULT_UC, &global->timeev); - curl_multi_perform(global, &global->nrun); - return 0; + CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); + evcurl_global_data *global = cglobal; + ev_timer_stop(EV_DEFAULT, &global->timeev); + if (tblock_ms < 0 || 14000 < tblock_ms) + tblock_ms = 14000; + ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0); + ev_timer_start(EV_DEFAULT_UC, &global->timeev); + curl_multi_perform(global, &global->nrun); + return 0; } static int -gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) { - evcurl_global_data *global = cglobal; - CURLM *mhnd = global->mhnd; - char *f; - AsyncIO *IO = (AsyncIO*) vIO; - CURLcode sta; +gotwatchsock(CURL *easy, + curl_socket_t fd, + int action, + void *cglobal, + void *vIO) +{ + evcurl_global_data *global = cglobal; + CURLM *mhnd = global->mhnd; + char *f; + AsyncIO *IO = (AsyncIO*) vIO; + CURLcode sta; const char *Action; if (IO == NULL) { - sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); - if (sta) { - EV_syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); - return -1; - } - IO = (AsyncIO *) f; - EV_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl); + sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); + if (sta) { + EVCURL_syslog(LOG_ERR, + "EVCURL: error asking curl for private " + "cookie of curl handle: %s\n", + curl_easy_strerror(sta)); + return -1; + } + IO = (AsyncIO *) f; + EVCURL_syslog(LOG_DEBUG, + "EVCURL: got socket for URL: %s\n", + IO->ConnectMe->PlainUrl); + if (IO->SendBuf.fd != 0) { ev_io_stop(event_base, &IO->recv_event); @@ -248,182 +327,173 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) curl_multi_assign(mhnd, fd, IO); } + IO->Now = ev_now(event_base); + Action = ""; switch (action) { case CURL_POLL_NONE: - Action = "CURL_POLL_NONE"; + Action = "CURL_POLL_NONE"; break; case CURL_POLL_REMOVE: - Action = "CURL_POLL_REMOVE"; + Action = "CURL_POLL_REMOVE"; break; case CURL_POLL_IN: - Action = "CURL_POLL_IN"; + Action = "CURL_POLL_IN"; break; case CURL_POLL_OUT: - Action = "CURL_POLL_OUT"; + Action = "CURL_POLL_OUT"; break; case CURL_POLL_INOUT: - Action = "CURL_POLL_INOUT"; + Action = "CURL_POLL_INOUT"; break; - } + } - EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", (int)fd, Action, action); + EVCURL_syslog(LOG_DEBUG, + "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", + (int)fd, Action, action); switch (action) { case CURL_POLL_NONE: - EVM_syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n"); + EVCURLM_syslog(LOG_ERR, + "called first time " + "to register this sockwatcker\n"); break; case CURL_POLL_REMOVE: - EVM_syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n"); - ev_io_stop(event_base, &IO->recv_event); - ev_io_stop(event_base, &IO->send_event); + EVCURLM_syslog(LOG_ERR, + "called last time to unregister " + "this sockwatcher\n"); + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); break; case CURL_POLL_IN: - ev_io_start(event_base, &IO->recv_event); - ev_io_stop(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); break; case CURL_POLL_OUT: - ev_io_start(event_base, &IO->send_event); - ev_io_stop(event_base, &IO->recv_event); + ev_io_start(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); break; case CURL_POLL_INOUT: - ev_io_start(event_base, &IO->send_event); - ev_io_start(event_base, &IO->recv_event); + ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); break; - } - return 0; + } + return 0; } -void curl_init_connectionpool(void) +void curl_init_connectionpool(void) { - CURLM *mhnd ; - - ev_timer_init(&global.timeev, &gottime, 14.0, 14.0); - global.timeev.data = (void *)&global; - global.nrun = -1; - CURLcode sta = curl_global_init(CURL_GLOBAL_ALL); - - if (sta) - { - syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta)); - exit(1); - } - mhnd = global.mhnd = curl_multi_init(); - if (!mhnd) - { - syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n"); - exit(3); - } - - MOPT(SOCKETFUNCTION, &gotwatchsock); - MOPT(SOCKETDATA, (void *)&global); - MOPT(TIMERFUNCTION, &gotwatchtime); - MOPT(TIMERDATA, (void *)&global); - - return; -} + CURLM *mhnd ; + ev_timer_init(&global.timeev, &gottime, 14.0, 14.0); + global.timeev.data = (void *)&global; + global.nrun = -1; + CURLcode sta = curl_global_init(CURL_GLOBAL_ALL); + if (sta) + { + CURL_syslog(LOG_ERR, + "error initializing curl library: %s\n", + curl_easy_strerror(sta)); + exit(1); + } + mhnd = global.mhnd = curl_multi_init(); + if (!mhnd) + { + CURLM_syslog(LOG_ERR, + "error initializing curl multi handle\n"); + exit(3); + } + + MOPT(SOCKETFUNCTION, &gotwatchsock); + MOPT(SOCKETDATA, (void *)&global); + MOPT(TIMERFUNCTION, &gotwatchtime); + MOPT(TIMERDATA, (void *)&global); -int evcurl_init(AsyncIO *IO, - void *CustomData, - const char* Desc, - IO_CallBack CallBack, - IO_CallBack Terminate, - IO_CallBack ShutdownAbort) + return; +} + +int evcurl_init(AsyncIO *IO) { - CURLcode sta; - CURL *chnd; - - EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); - IO->HttpReq.attached = 0; - IO->SendDone = CallBack; - IO->Terminate = Terminate; - IO->ShutdownAbort = ShutdownAbort; - chnd = IO->HttpReq.chnd = curl_easy_init(); - if (!chnd) - { - EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); - return 1; - } - - strcpy(IO->HttpReq.errdesc, Desc); - - OPT(VERBOSE, (long)1); - /* unset in production */ - OPT(NOPROGRESS, 1L); - OPT(NOSIGNAL, 1L); - OPT(FAILONERROR, (long)1); - OPT(ENCODING, ""); - OPT(FOLLOWLOCATION, (long)0); - OPT(MAXREDIRS, (long)0); - OPT(USERAGENT, CITADEL); - - OPT(TIMEOUT, (long)1800); - OPT(LOW_SPEED_LIMIT, (long)64); - OPT(LOW_SPEED_TIME, (long)600); - OPT(CONNECTTIMEOUT, (long)600); - OPT(PRIVATE, (void *)IO); - - OPT(FORBID_REUSE, 1); - OPT(WRITEFUNCTION, &gotdata); + CURLcode sta; + CURL *chnd; + + EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); + IO->HttpReq.attached = 0; + chnd = IO->HttpReq.chnd = curl_easy_init(); + if (!chnd) + { + EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); + return 0; + } + +#if DEBUG + OPT(VERBOSE, (long)1); +#endif + OPT(NOPROGRESS, 1L); + + OPT(NOSIGNAL, 1L); + OPT(FAILONERROR, (long)1); + OPT(ENCODING, ""); + OPT(FOLLOWLOCATION, (long)0); + OPT(MAXREDIRS, (long)0); + OPT(USERAGENT, CITADEL); + + OPT(TIMEOUT, (long)1800); + OPT(LOW_SPEED_LIMIT, (long)64); + OPT(LOW_SPEED_TIME, (long)600); + OPT(CONNECTTIMEOUT, (long)600); + OPT(PRIVATE, (void *)IO); + + OPT(FORBID_REUSE, 1); + OPT(WRITEFUNCTION, &gotdata); OPT(WRITEDATA, (void *)IO); OPT(ERRORBUFFER, IO->HttpReq.errdesc); - if ( - (!IsEmptyStr(config.c_ip_addr)) + if ((!IsEmptyStr(config.c_ip_addr)) && (strcmp(config.c_ip_addr, "*")) && (strcmp(config.c_ip_addr, "::")) && (strcmp(config.c_ip_addr, "0.0.0.0")) - ) { - OPT(INTERFACE, config.c_ip_addr); - } - /* point to a structure that points back to the perl structure and stuff */ - EV_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); - OPT(URL, IO->ConnectMe->PlainUrl); - if (StrLength(IO->ConnectMe->CurlCreds)) + ) { - OPT(HTTPAUTH, (long)CURLAUTH_BASIC); - OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds)); + OPT(INTERFACE, config.c_ip_addr); } + #ifdef CURLOPT_HTTP_CONTENT_DECODING OPT(HTTP_CONTENT_DECODING, 1); OPT(ENCODING, ""); #endif - if (StrLength(IO->HttpReq.PostData) > 0) - { - OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData)); - OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData)); - - } - else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL)) - { - OPT(POSTFIELDS, IO->HttpReq.PlainPostData); - OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen); - } - IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Connection: close"); - OPT(HTTPHEADER, IO->HttpReq.headers); + IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, + "Connection: close"); return 1; } -static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) +static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) { - CURLMcode msta; + CURLMcode msta; AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); curl_slist_free_all(IO->HttpReq.headers); msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd); if (msta) - EV_syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta)); - + { + EVCURL_syslog(LOG_ERR, + "EVCURL: warning problem detaching completed handle " + "from curl multi: %s\n", + curl_multi_strerror(msta)); + } + curl_easy_cleanup(IO->HttpReq.chnd); IO->HttpReq.chnd = NULL; ev_cleanup_stop(event_base, &IO->abort_by_shutdown); @@ -433,25 +503,57 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *wat IO->ShutdownAbort(IO); } eNextState -evcurl_handle_start(AsyncIO *IO) +evcurl_handle_start(AsyncIO *IO) { CURLMcode msta; + CURLcode sta; + CURL *chnd; + + chnd = IO->HttpReq.chnd; + EVCURL_syslog(LOG_DEBUG, + "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); + OPT(URL, IO->ConnectMe->PlainUrl); + if (StrLength(IO->ConnectMe->CurlCreds)) + { + OPT(HTTPAUTH, (long)CURLAUTH_BASIC); + OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds)); + } + if (StrLength(IO->HttpReq.PostData) > 0) + { + OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData)); + OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData)); + + } + else if ((IO->HttpReq.PlainPostDataLen != 0) && + (IO->HttpReq.PlainPostData != NULL)) + { + OPT(POSTFIELDS, IO->HttpReq.PlainPostData); + OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen); + } + OPT(HTTPHEADER, IO->HttpReq.headers); + IO->NextState = eConnect; - EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); + EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd); if (msta) - EV_syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta)); + { + EVCURL_syslog(LOG_ERR, + "EVCURL: error attaching to curl multi handle: %s\n", + curl_multi_strerror(msta)); + } + IO->HttpReq.attached = 1; ev_async_send (event_base, &WakeupCurl); - ev_cleanup_init(&IO->abort_by_shutdown, + ev_cleanup_init(&IO->abort_by_shutdown, IOcurl_abort_shutdown_callback); + ev_cleanup_start(event_base, &IO->abort_by_shutdown); return eReadMessage; } static void WakeupCurlCallback(EV_P_ ev_async *w, int revents) { - syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n"); + CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n"); curl_multi_perform(&global, CURL_POLL_NONE); } @@ -460,7 +562,7 @@ static void evcurl_shutdown (void) { curl_global_cleanup(); curl_multi_cleanup(global.mhnd); - syslog(LOG_DEBUG, "client_event_thread() initializing\n"); + CURLM_syslog(LOG_DEBUG, "exiting\n"); } /***************************************************************************** * libevent integration * @@ -476,14 +578,15 @@ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; -ev_async AddJob; +ev_async AddJob; ev_async ExitEventLoop; static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { + ev_tstamp Now; HashList *q; void *v; - HashPos *It; + HashPos*It; long len; const char *Key; @@ -499,13 +602,17 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) q = InboundEventQueues[1]; } pthread_mutex_unlock(&EventQueueMutex); - + Now = ev_now (event_base); It = GetNewHashPos(q, 0); while (GetNextHashPos(q, It, &len, &Key, &v)) { IOAddHandler *h = v; - if (h->IO->ID == 0) + if (h->IO->ID == 0) { h->IO->ID = EvIDSource++; + } + if (h->IO->StartIO == 0.0) + h->IO->StartIO = Now; + h->IO->Now = Now; h->EvAttch(h->IO); } DeleteHashPos(&It); @@ -530,7 +637,9 @@ void InitEventQueue(void) pthread_mutex_init(&EventQueueMutex, NULL); if (pipe(event_add_pipe) != 0) { - syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); + syslog(LOG_EMERG, + "Unable to create pipe for libev queueing: %s\n", + strerror(errno)); abort(); } LimitSet.rlim_cur = 1; @@ -542,10 +651,11 @@ void InitEventQueue(void) InboundEventQueues[1] = NewHash(1, Flathash); InboundEventQueue = InboundEventQueues[0]; } +extern void CtdlDestroyEVCleanupHooks(void); + +extern int EVQShutDown; /* * this thread operates the select() etc. via libev. - * - * */ void *client_event_thread(void *arg) { @@ -577,10 +687,16 @@ void *client_event_thread(void *arg) DeleteHash(&InboundEventQueues[1]); /* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); + close(event_add_pipe[0]); + close(event_add_pipe[1]); + CtdlDestroyEVCleanupHooks(); + + EVQShutDown = 1; return(NULL); } -/*------------------------------------------------------------------------------*/ + +/*----------------------------------------------------------------------------*/ /* * DB-Queue; does async bdb operations. * has its own set of handlers. @@ -593,16 +709,17 @@ HashList *DBQueueEvents = NULL; HashList *DBInboundEventQueue = NULL; HashList *DBInboundEventQueues[2] = { NULL, NULL }; -ev_async DBAddJob; +ev_async DBAddJob; ev_async DBExitEventLoop; extern void ShutDownDBCLient(AsyncIO *IO); static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) { + ev_tstamp Now; HashList *q; void *v; - HashPos *It; + HashPos *It; long len; const char *Key; @@ -619,6 +736,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) } pthread_mutex_unlock(&DBEventQueueMutex); + Now = ev_now (event_db); It = GetNewHashPos(q, 0); while (GetNextHashPos(q, It, &len, &Key, &v)) { @@ -626,13 +744,16 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) eNextState rc; if (h->IO->ID == 0) h->IO->ID = EvIDSource++; + if (h->IO->StartDB == 0.0) + h->IO->StartDB = Now; + h->IO->Now = Now; rc = h->EvAttch(h->IO); switch (rc) { case eAbort: - ShutDownDBCLient(h->IO); + ShutDownDBCLient(h->IO); default: - break; + break; } } DeleteHashPos(&It); @@ -671,10 +792,8 @@ void DBInitEventQueue(void) /* * this thread operates writing to the message database via libev. - * - * */ -void *db_event_thread(void *arg) +void *db_event_thread(void *arg) { struct CitContext libev_msg_CC; @@ -700,6 +819,9 @@ void *db_event_thread(void *arg) DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); + + close(evdb_add_pipe[0]); + close(evdb_add_pipe[1]); /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ return(NULL); @@ -718,16 +840,26 @@ void ShutDownEventQueues(void) pthread_mutex_unlock(&EventQueueMutex); } +void DebugEventloopEnable(void) +{ + DebugEventLoop = 1; +} + +void DebugCurlEnable(void) +{ + DebugCurl = 1; +} + CTDL_MODULE_INIT(event_client) { if (!threading) { - CtdlRegisterCleanupHook(ShutDownEventQueues); + CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable); + CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable); InitEventQueue(); DBInitEventQueue(); - CtdlThreadCreate(/*"Client event", */ client_event_thread); - CtdlThreadCreate(/*"DB event", */db_event_thread); -/// todo register shutdown callback. + CtdlThreadCreate(client_event_thread); + CtdlThreadCreate(db_event_thread); } return "event"; }