X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Feventclient%2Fserv_eventclient.c;h=f7e286a9ab14cf6188f352b52ac036224df99982;hb=8357d67fb22adec3b854d61bdbd898dcfcc91959;hp=7b6a8c3dd27e8ca77ea6d3c668a617128c572b2e;hpb=2f6ccb7ba59f1aa4de0170ae2c8687cbfad15ce0;p=citadel.git diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 7b6a8c3dd..f7e286a9a 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -1,19 +1,19 @@ /* - * Copyright (c) 1998-2009 by the citadel.org team + * Copyright (c) 1998-2012 by the citadel.org team * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. + * This program is open source software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 3. + * + * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + * */ #include "sysdep.h" @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -55,195 +56,379 @@ #include "ctdl_module.h" -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT - #include "event_client.h" #include "serv_curl.h" -int event_add_pipe[2] = {-1, -1}; - -citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ -HashList *QueueEvents = NULL; +ev_loop *event_base; +int DebugEventLoop = 0; +int DebugEventLoopBacktrace = 0; +int DebugCurl = 0; +pthread_key_t evConKey; -HashList *InboundEventQueue = NULL; -HashList *InboundEventQueues[2] = { NULL, NULL }; +long EvIDSource = 1; +/***************************************************************************** + * libevent / curl integration * + *****************************************************************************/ +#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) -struct ev_loop *event_base; +#define EVCURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID, __VA_ARGS__) -ev_async AddJob; -ev_async ExitEventLoop; -ev_async WakeupCurl; +#define EVCURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID) -extern struct ev_loop *event_base; +#define CURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) -void SockStateCb(void *data, int sock, int read, int write); +#define CURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT) #define MOPT(s, v) \ do { \ sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ if (sta) { \ - CtdlLogPrintf(CTDL_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \ + EVQ_syslog(LOG_ERR, "error setting option " \ + #s " on curl multi handle: %s\n", \ + curl_easy_strerror(sta)); \ exit (1); \ } \ } while (0) +typedef struct _evcurl_global_data { + int magic; + CURLM *mhnd; + ev_timer timeev; + int nrun; +} evcurl_global_data; -/***************************************************************************** - * libevent / curl integration * - *****************************************************************************/ - - +ev_async WakeupCurl; evcurl_global_data global; +eNextState QueueAnDBOperation(AsyncIO *IO); + static void -gotstatus(evcurl_global_data *global, int nnrun) +gotstatus(int nnrun) { - CURLM *mhnd; CURLMsg *msg; int nmsg; -/* - if (EVCURL_GLOBAL_MAGIC != global.magic) - { - CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct"); - return; - } -*/ - global->nrun = nnrun; - mhnd = global->mhnd; - - CtdlLogPrintf(CTDL_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n"); - while ((msg = curl_multi_info_read(mhnd, &nmsg))) { - CtdlLogPrintf(CTDL_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg); + + global.nrun = nnrun; + + CURLM_syslog(LOG_DEBUG, + "gotstatus(): about to call curl_multi_info_read\n"); + while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) { + CURL_syslog(LOG_DEBUG, + "got curl multi_info message msg=%d\n", + msg->msg); + if (CURLMSG_DONE == msg->msg) { - CtdlLogPrintf(CTDL_ERR, "EVCURL: request complete\n"); - CURL *chnd = msg->easy_handle; - char *chandle = NULL;; - CURLcode sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle); - if (sta) - CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); - evcurl_request_data *handle = (void *)chandle; - if (global != handle->global || chnd != handle->chnd) - CtdlLogPrintf(CTDL_ERR, "EVCURL: internal evcurl error: unknown curl handle completed\n"); + CURL *chnd; + void *chandle = NULL; + CURLcode sta; + CURLMcode msta; + AsyncIO*IO; + + chandle = NULL;; + chnd = msg->easy_handle; + sta = curl_easy_getinfo(chnd, + CURLINFO_PRIVATE, + &chandle); + if (sta) { + syslog(LOG_ERR, + "error asking curl for private" + " cookie of curl handle: %s\n", + curl_easy_strerror(sta)); + continue; + } + IO = (AsyncIO *)chandle; + if (IO->ID == 0) { + EVCURL_syslog(LOG_ERR, + "Error, invalid IO context %p\n", + IO); + continue; + } + SetEVState(IO, eCurlGotStatus); + + EVCURLM_syslog(LOG_DEBUG, "request complete\n"); + + IO->Now = ev_now(event_base); + + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + sta = msg->data.result; if (sta) { - CtdlLogPrintf(CTDL_ERR, "EVCURL: error description: %s\n", handle->errdesc); - CtdlLogPrintf(CTDL_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta)); + EVCURL_syslog(LOG_ERR, + "error description: %s\n", + IO->HttpReq.errdesc); + IO->HttpReq.CurlError = curl_easy_strerror(sta); + EVCURL_syslog(LOG_ERR, + "error performing request: %s\n", + IO->HttpReq.CurlError); + if (sta == CURLE_OPERATION_TIMEDOUT) + { + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } } - long httpcode; - sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &httpcode); + sta = curl_easy_getinfo(chnd, + CURLINFO_RESPONSE_CODE, + &IO->HttpReq.httpcode); if (sta) - CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta)); - CtdlLogPrintf(CTDL_ERR, "EVCURL: http response code was %ld\n", (long)httpcode); - CURLMcode msta = curl_multi_remove_handle(mhnd, chnd); + EVCURL_syslog(LOG_ERR, + "error asking curl for " + "response code from request: %s\n", + curl_easy_strerror(sta)); + EVCURL_syslog(LOG_DEBUG, + "http response code was %ld\n", + (long)IO->HttpReq.httpcode); + + + curl_slist_free_all(IO->HttpReq.headers); + msta = curl_multi_remove_handle(global.mhnd, chnd); if (msta) - CtdlLogPrintf(CTDL_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta)); - handle->attached = 0; + EVCURL_syslog(LOG_ERR, + "warning problem detaching " + "completed handle from curl multi: " + "%s\n", + curl_multi_strerror(msta)); + + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + IO->HttpReq.attached = 0; + switch(IO->SendDone(IO)) + { + case eDBQuery: + FreeURL(&IO->ConnectMe); + QueueAnDBOperation(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + curl_easy_cleanup(IO->HttpReq.chnd); + IO->HttpReq.chnd = NULL; + FreeStrBuf(&IO->HttpReq.ReplyData); + FreeURL(&IO->ConnectMe); + RemoveContext(IO->CitContext); + IO->Terminate(IO); + } } } } static void -stepmulti(evcurl_global_data *global, curl_socket_t fd) { - int nnrun; - CURLMcode msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun); - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n"); +stepmulti(void *data, curl_socket_t fd, int which) +{ + int running_handles = 0; + CURLMcode msta; + + msta = curl_multi_socket_action(global.mhnd, + fd, + which, + &running_handles); + + CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n"); if (msta) - CtdlLogPrintf(CTDL_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta)); - if (global->nrun != nnrun) - gotstatus(global, nnrun); + CURL_syslog(LOG_ERR, + "error in curl processing events" + "on multi handle, fd %d: %s\n", + (int)fd, + curl_multi_strerror(msta)); + + if (global.nrun != running_handles) + gotstatus(running_handles); } static void -gottime(struct ev_loop *loop, ev_timer *timeev, int events) { - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for timeout\n"); - evcurl_global_data *global = (void *)timeev->data; - stepmulti(global, CURL_SOCKET_TIMEOUT); +gottime(struct ev_loop *loop, ev_timer *timeev, int events) +{ + CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); + stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0); +} + +static void +got_in(struct ev_loop *loop, ev_io *ioev, int events) +{ + CURL_syslog(LOG_DEBUG, + "EVCURL: waking up curl for io on fd %d\n", + (int)ioev->fd); + + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN); } static void -gotio(struct ev_loop *loop, ev_io *ioev, int events) { - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); - sockwatcher_data *sockwatcher = (void *)ioev->data; - stepmulti(sockwatcher->global, ioev->fd); +got_out(struct ev_loop *loop, ev_io *ioev, int events) +{ + CURL_syslog(LOG_DEBUG, + "waking up curl for io on fd %d\n", + (int)ioev->fd); + + stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT); } static size_t -gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { - evcurl_request_data *D = (evcurl_request_data*) data; - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n"); - return CurlFillStrBuf_callback(D->ReplyData, size, nmemb, cglobal); +gotdata(void *data, size_t size, size_t nmemb, void *cglobal) +{ + AsyncIO *IO = (AsyncIO*) cglobal; + + SetEVState(IO, eCurlGotData); + if (IO->HttpReq.ReplyData == NULL) + { + IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); + } + IO->Now = ev_now(event_base); + return CurlFillStrBuf_callback(data, + size, + nmemb, + IO->HttpReq.ReplyData); } static int gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) { - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); + CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); evcurl_global_data *global = cglobal; ev_timer_stop(EV_DEFAULT, &global->timeev); if (tblock_ms < 0 || 14000 < tblock_ms) tblock_ms = 14000; ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0); ev_timer_start(EV_DEFAULT_UC, &global->timeev); - curl_multi_perform(global, CURL_POLL_NONE); + curl_multi_perform(global, &global->nrun); return 0; } static int -gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) { +gotwatchsock(CURL *easy, + curl_socket_t fd, + int action, + void *cglobal, + void *vIO) +{ evcurl_global_data *global = cglobal; CURLM *mhnd = global->mhnd; - sockwatcher_data *sockwatcher = csockwatcher; + void *f; + AsyncIO *IO = (AsyncIO*) vIO; + CURLcode sta; + const char *Action; + + if (IO == NULL) { + sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); + if (sta) { + CURL_syslog(LOG_ERR, + "EVCURL: error asking curl for private " + "cookie of curl handle: %s\n", + curl_easy_strerror(sta)); + return -1; + } + IO = (AsyncIO *) f; + SetEVState(IO, eCurlNewIO); + EVCURL_syslog(LOG_DEBUG, + "EVCURL: got socket for URL: %s\n", + IO->ConnectMe->PlainUrl); + + if (IO->SendBuf.fd != 0) + { + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + } + IO->SendBuf.fd = fd; + ev_io_init(&IO->recv_event, &got_in, fd, EV_READ); + ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE); + curl_multi_assign(mhnd, fd, IO); + } - CtdlLogPrintf(CTDL_DEBUG,"EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); + SetEVState(IO, eCurlGotIO); + IO->Now = ev_now(event_base); - if (!sockwatcher) { - CtdlLogPrintf(CTDL_ERR,"EVCURL: called first time to register this sockwatcker\n"); - sockwatcher = malloc(sizeof(sockwatcher_data)); - sockwatcher->global = global; - ev_init(&sockwatcher->ioev, &gotio); - sockwatcher->ioev.data = (void *)sockwatcher; - curl_multi_assign(mhnd, fd, sockwatcher); + Action = ""; + switch (action) + { + case CURL_POLL_NONE: + Action = "CURL_POLL_NONE"; + break; + case CURL_POLL_REMOVE: + Action = "CURL_POLL_REMOVE"; + break; + case CURL_POLL_IN: + Action = "CURL_POLL_IN"; + break; + case CURL_POLL_OUT: + Action = "CURL_POLL_OUT"; + break; + case CURL_POLL_INOUT: + Action = "CURL_POLL_INOUT"; + break; } - if (CURL_POLL_REMOVE == action) { - CtdlLogPrintf(CTDL_ERR,"EVCURL: called last time to unregister this sockwatcher\n"); - free(sockwatcher); - } else { - int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0); - ev_io_stop(EV_DEFAULT, &sockwatcher->ioev); - if (events) { - ev_io_set(&sockwatcher->ioev, fd, events); - ev_io_start(EV_DEFAULT, &sockwatcher->ioev); - } + + + EVCURL_syslog(LOG_DEBUG, + "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", + (int)fd, Action, action); + + switch (action) + { + case CURL_POLL_NONE: + EVCURLM_syslog(LOG_DEBUG, + "called first time " + "to register this sockwatcker\n"); + break; + case CURL_POLL_REMOVE: + EVCURLM_syslog(LOG_DEBUG, + "called last time to unregister " + "this sockwatcher\n"); + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + break; + case CURL_POLL_IN: + ev_io_start(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + break; + case CURL_POLL_OUT: + ev_io_start(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + break; + case CURL_POLL_INOUT: + ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); + break; } return 0; } -void curl_init_connectionpool(void) +void curl_init_connectionpool(void) { CURLM *mhnd ; -// global.magic = EVCURL_GLOBAL_MAGIC; ev_timer_init(&global.timeev, &gottime, 14.0, 14.0); global.timeev.data = (void *)&global; global.nrun = -1; CURLcode sta = curl_global_init(CURL_GLOBAL_ALL); - if (sta) + if (sta) { - CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta)); + CURL_syslog(LOG_ERR, + "error initializing curl library: %s\n", + curl_easy_strerror(sta)); + exit(1); } -/* - if (!ev_default_loop(EVFLAG_AUTO)) - { - CtdlLogPrintf(CTDL_ERR,"error initializing libev\n"); - exit(2); - } -*/ mhnd = global.mhnd = curl_multi_init(); if (!mhnd) { - CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl multi handle\n"); + CURLM_syslog(LOG_ERR, + "error initializing curl multi handle\n"); exit(3); } @@ -252,123 +437,196 @@ void curl_init_connectionpool(void) MOPT(TIMERFUNCTION, &gotwatchtime); MOPT(TIMERDATA, (void *)&global); - /* well, just there to fire the sample request?*/ -/// ev_timer_start(EV_DEFAULT, &global.timeev); return; } - - - -int evcurl_init(evcurl_request_data *handle, - void *CustomData, - const char* Desc, - int CallBack) +int evcurl_init(AsyncIO *IO) { CURLcode sta; CURL *chnd; - CtdlLogPrintf(CTDL_DEBUG,"EVCURL: evcurl_init called ms\n"); - handle->global = &global; - handle->attached = 0; - chnd = handle->chnd = curl_easy_init(); + EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); + IO->HttpReq.attached = 0; + chnd = IO->HttpReq.chnd = curl_easy_init(); if (!chnd) { - CtdlLogPrintf(CTDL_ERR, "EVCURL: error initializing curl handle\n"); - return 1; + EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); + return 0; } - strcpy(handle->errdesc, Desc); - +#if DEBUG OPT(VERBOSE, (long)1); - /* unset in production */ - OPT(NOPROGRESS, (long)1); - OPT(NOSIGNAL, (long)1); +#endif + OPT(NOPROGRESS, 1L); + + OPT(NOSIGNAL, 1L); OPT(FAILONERROR, (long)1); OPT(ENCODING, ""); - OPT(FOLLOWLOCATION, (long)1); - OPT(MAXREDIRS, (long)7); + OPT(FOLLOWLOCATION, (long)0); + OPT(MAXREDIRS, (long)0); OPT(USERAGENT, CITADEL); OPT(TIMEOUT, (long)1800); OPT(LOW_SPEED_LIMIT, (long)64); OPT(LOW_SPEED_TIME, (long)600); - OPT(CONNECTTIMEOUT, (long)600); - OPT(PRIVATE, (void *)handle); - - - OPT(WRITEFUNCTION, &gotdata); - OPT(WRITEDATA, (void *)handle); - OPT(ERRORBUFFER, handle->errdesc); - - /* point to a structure that points back to the perl structure and stuff */ - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: Loading URL: %s\n", handle->URL->PlainUrl); - OPT(URL, handle->URL->PlainUrl); - if (StrLength(handle->URL->CurlCreds)) + OPT(CONNECTTIMEOUT, (long)600); + OPT(PRIVATE, (void *)IO); + + OPT(FORBID_REUSE, 1); + OPT(WRITEFUNCTION, &gotdata); + OPT(WRITEDATA, (void *)IO); + OPT(ERRORBUFFER, IO->HttpReq.errdesc); + + if ((!IsEmptyStr(config.c_ip_addr)) + && (strcmp(config.c_ip_addr, "*")) + && (strcmp(config.c_ip_addr, "::")) + && (strcmp(config.c_ip_addr, "0.0.0.0")) + ) { - OPT(HTTPAUTH, (long)CURLAUTH_BASIC); - OPT(USERPWD, ChrPtr(handle->URL->CurlCreds)); + OPT(INTERFACE, config.c_ip_addr); } + #ifdef CURLOPT_HTTP_CONTENT_DECODING OPT(HTTP_CONTENT_DECODING, 1); OPT(ENCODING, ""); #endif - if (StrLength(handle->PostData) > 0) - { - OPT(POSTFIELDS, ChrPtr(handle->PostData)); - OPT(POSTFIELDSIZE, StrLength(handle->PostData)); - } - else if ((handle->PlainPostDataLen != 0) && (handle->PlainPostData != NULL)) - { - OPT(POSTFIELDS, handle->PlainPostData); - OPT(POSTFIELDSIZE, handle->PlainPostDataLen); - } - - if (handle->headers != NULL) - OPT(HTTPHEADER, handle->headers); + IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, + "Connection: close"); return 1; } -void -evcurl_handle_start(evcurl_request_data *handle) + +static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) { CURLMcode msta; - - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: attaching to curl multi handle\n"); - msta = curl_multi_add_handle(handle->global->mhnd, handle->chnd); + AsyncIO *IO = watcher->data; + + if (IO == NULL) + return; + + SetEVState(IO, eCurlShutdown); + IO->Now = ev_now(event_base); + EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); + + curl_slist_free_all(IO->HttpReq.headers); + msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd); if (msta) - CtdlLogPrintf(CTDL_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta)); - handle->attached = 1; -// ev_timer_start(EV_DEFAULT, &global.timeev); + { + EVCURL_syslog(LOG_ERR, + "EVCURL: warning problem detaching completed handle " + "from curl multi: %s\n", + curl_multi_strerror(msta)); + } + + curl_easy_cleanup(IO->HttpReq.chnd); + IO->HttpReq.chnd = NULL; + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_io_stop(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} +eNextState +evcurl_handle_start(AsyncIO *IO) +{ + CURLMcode msta; + CURLcode sta; + CURL *chnd; + + SetEVState(IO, eCurlStart); + chnd = IO->HttpReq.chnd; + EVCURL_syslog(LOG_DEBUG, + "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); + OPT(URL, IO->ConnectMe->PlainUrl); + if (StrLength(IO->ConnectMe->CurlCreds)) + { + OPT(HTTPAUTH, (long)CURLAUTH_BASIC); + OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds)); + } + if (StrLength(IO->HttpReq.PostData) > 0) + { + OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData)); + OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData)); + + } + else if ((IO->HttpReq.PlainPostDataLen != 0) && + (IO->HttpReq.PlainPostData != NULL)) + { + OPT(POSTFIELDS, IO->HttpReq.PlainPostData); + OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen); + } + OPT(HTTPHEADER, IO->HttpReq.headers); + + IO->NextState = eConnect; + EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); + msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd); + if (msta) + { + EVCURL_syslog(LOG_ERR, + "EVCURL: error attaching to curl multi handle: %s\n", + curl_multi_strerror(msta)); + } + + IO->HttpReq.attached = 1; ev_async_send (event_base, &WakeupCurl); + + ev_cleanup_init(&IO->abort_by_shutdown, + IOcurl_abort_shutdown_callback); + + ev_cleanup_start(event_base, &IO->abort_by_shutdown); + + return eReadMessage; } static void WakeupCurlCallback(EV_P_ ev_async *w, int revents) { -/// evcurl_global_data *global = cglobal; - - CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl multi handle\n"); + CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n"); curl_multi_perform(&global, CURL_POLL_NONE); } +static void evcurl_shutdown (void) +{ + curl_global_cleanup(); + curl_multi_cleanup(global.mhnd); + CURLM_syslog(LOG_DEBUG, "exiting\n"); +} /***************************************************************************** * libevent integration * *****************************************************************************/ +/* + * client event queue plus its methods. + * this currently is the main loop (which may change in some future?) + */ +int evbase_count = 0; +pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */ +HashList *QueueEvents = NULL; +HashList *InboundEventQueue = NULL; +HashList *InboundEventQueues[2] = { NULL, NULL }; +extern void ShutDownCLient(AsyncIO *IO); +ev_async AddJob; +ev_async ExitEventLoop; static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { + CitContext *Ctx; + long IOID = -1; + long count = 0; + ev_tstamp Now; HashList *q; void *v; - HashPos *It; + HashPos*It; long len; const char *Key; /* get the control command... */ - citthread_mutex_lock(&EventQueueMutex); + pthread_mutex_lock(&EventQueueMutex); if (InboundEventQueues[0] == InboundEventQueue) { InboundEventQueue = InboundEventQueues[1]; @@ -378,63 +636,89 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) InboundEventQueue = InboundEventQueues[0]; q = InboundEventQueues[1]; } - citthread_mutex_unlock(&EventQueueMutex); - + pthread_mutex_unlock(&EventQueueMutex); + Now = ev_now (event_base); It = GetNewHashPos(q, 0); while (GetNextHashPos(q, It, &len, &Key, &v)) { IOAddHandler *h = v; - h->EvAttch(h->IO); + count ++; + if (h->IO->ID == 0) { + h->IO->ID = EvIDSource++; + } + IOID = h->IO->ID; + if (h->IO->StartIO == 0.0) + h->IO->StartIO = Now; + + SetEVState(h->IO, eIOAttach); + + Ctx = h->IO->CitContext; + become_session(Ctx); + + h->IO->Now = Now; + switch (h->EvAttch(h->IO)) + { + case eReadMore: + case eReadMessage: + case eReadFile: + case eSendReply: + case eSendMore: + case eReadPayload: + case eSendFile: + case eDBQuery: + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + break; + case eTerminateConnection: + case eAbort: + ShutDownCLient(h->IO); + break; + } } DeleteHashPos(&It); DeleteHashContent(&q); - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n"); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld done.", IOSTR, IOID, count); } static void EventExitCallback(EV_P_ ev_async *w, int revents) { - ev_unloop(event_base, EVUNLOOP_ALL); + ev_break(event_base, EVBREAK_ALL); - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); } void InitEventQueue(void) { - struct rlimit LimitSet; - - citthread_mutex_init(&EventQueueMutex, NULL); - - if (pipe(event_add_pipe) != 0) { - CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(event_add_pipe[1], &LimitSet); + pthread_mutex_init(&EventQueueMutex, NULL); + pthread_mutex_init(&EventExitQueueMutex, NULL); QueueEvents = NewHash(1, Flathash); InboundEventQueues[0] = NewHash(1, Flathash); InboundEventQueues[1] = NewHash(1, Flathash); InboundEventQueue = InboundEventQueues[0]; } +extern void CtdlDestroyEVCleanupHooks(void); + +extern int EVQShutDown; +const char *IOLog = "IO"; /* * this thread operates the select() etc. via libev. - * - * */ void *client_event_thread(void *arg) { - struct CitContext libevent_client_CC; + struct CitContext libev_client_CC; - CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n"); + CtdlFillSystemContext(&libev_client_CC, "LibEv Thread"); - event_base = ev_default_loop (EVFLAG_AUTO); + pthread_setspecific(evConKey, IOLog); + EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n"); + + event_base = ev_default_loop (EVFLAG_AUTO); ev_async_init(&AddJob, QueueEventAddCallback); ev_async_start(event_base, &AddJob); ev_async_init(&ExitEventLoop, EventExitCallback); @@ -444,33 +728,209 @@ void *client_event_thread(void *arg) curl_init_connectionpool(); - ev_loop (event_base, 0); + ev_run (event_base, 0); + EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n"); - CtdlClearSystemContext(); - ev_default_destroy (); - +///what todo here? CtdlClearSystemContext(); + pthread_mutex_lock(&EventExitQueueMutex); + ev_loop_destroy (EV_DEFAULT_UC); + event_base = NULL; DeleteHash(&QueueEvents); InboundEventQueue = NULL; DeleteHash(&InboundEventQueues[0]); DeleteHash(&InboundEventQueues[1]); - citthread_mutex_destroy(&EventQueueMutex); +/* citthread_mutex_destroy(&EventQueueMutex); TODO */ + evcurl_shutdown(); + CtdlDestroyEVCleanupHooks(); + pthread_mutex_unlock(&EventExitQueueMutex); + EVQShutDown = 1; return(NULL); } -#endif +/*----------------------------------------------------------------------------*/ +/* + * DB-Queue; does async bdb operations. + * has its own set of handlers. + */ +ev_loop *event_db; +int evdb_count = 0; +pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */ +HashList *DBQueueEvents = NULL; +HashList *DBInboundEventQueue = NULL; +HashList *DBInboundEventQueues[2] = { NULL, NULL }; + +ev_async DBAddJob; +ev_async DBExitEventLoop; + +extern void ShutDownDBCLient(AsyncIO *IO); + +static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) +{ + CitContext *Ctx; + long IOID = -1; + long count = 0;; + ev_tstamp Now; + HashList *q; + void *v; + HashPos *It; + long len; + const char *Key; + + /* get the control command... */ + pthread_mutex_lock(&DBEventQueueMutex); + + if (DBInboundEventQueues[0] == DBInboundEventQueue) { + DBInboundEventQueue = DBInboundEventQueues[1]; + q = DBInboundEventQueues[0]; + } + else { + DBInboundEventQueue = DBInboundEventQueues[0]; + q = DBInboundEventQueues[1]; + } + pthread_mutex_unlock(&DBEventQueueMutex); + + Now = ev_now (event_db); + It = GetNewHashPos(q, 0); + while (GetNextHashPos(q, It, &len, &Key, &v)) + { + IOAddHandler *h = v; + eNextState rc; + count ++; + if (h->IO->ID == 0) + h->IO->ID = EvIDSource++; + IOID = h->IO->ID; + if (h->IO->StartDB == 0.0) + h->IO->StartDB = Now; + h->IO->Now = Now; + + SetEVState(h->IO, eDBAttach); + Ctx = h->IO->CitContext; + become_session(Ctx); + ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); + rc = h->EvAttch(h->IO); + switch (rc) + { + case eAbort: + ShutDownDBCLient(h->IO); + default: + break; + } + } + DeleteHashPos(&It); + DeleteHashContent(&q); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count); +} + + +static void DBEventExitCallback(EV_P_ ev_async *w, int revents) +{ + EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n"); + ev_break(event_db, EVBREAK_ALL); +} + + + +void DBInitEventQueue(void) +{ + pthread_mutex_init(&DBEventQueueMutex, NULL); + pthread_mutex_init(&DBEventExitQueueMutex, NULL); + + DBQueueEvents = NewHash(1, Flathash); + DBInboundEventQueues[0] = NewHash(1, Flathash); + DBInboundEventQueues[1] = NewHash(1, Flathash); + DBInboundEventQueue = DBInboundEventQueues[0]; +} + +const char *DBLog = "BD"; + +/* + * this thread operates writing to the message database via libev. + */ +void *db_event_thread(void *arg) +{ + ev_loop *tmp; + struct CitContext libev_msg_CC; + + pthread_setspecific(evConKey, DBLog); + + CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); + + EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); + + tmp = event_db = ev_loop_new (EVFLAG_AUTO); + + ev_async_init(&DBAddJob, DBQueueEventAddCallback); + ev_async_start(event_db, &DBAddJob); + ev_async_init(&DBExitEventLoop, DBEventExitCallback); + ev_async_start(event_db, &DBExitEventLoop); + ev_run (event_db, 0); + + pthread_mutex_lock(&DBEventExitQueueMutex); + + event_db = NULL; + EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n"); + + DeleteHash(&DBQueueEvents); + DBInboundEventQueue = NULL; + DeleteHash(&DBInboundEventQueues[0]); + DeleteHash(&DBInboundEventQueues[1]); + +/* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ + + ev_loop_destroy (tmp); + pthread_mutex_unlock(&DBEventExitQueueMutex); + return(NULL); +} + +void ShutDownEventQueues(void) +{ + EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n"); + + pthread_mutex_lock(&DBEventQueueMutex); + ev_async_send (event_db, &DBExitEventLoop); + pthread_mutex_unlock(&DBEventQueueMutex); + + pthread_mutex_lock(&EventQueueMutex); + ev_async_send (EV_DEFAULT_ &ExitEventLoop); + pthread_mutex_unlock(&EventQueueMutex); +} + +void DebugEventloopEnable(const int n) +{ + DebugEventLoop = n; +} +void DebugEventloopBacktraceEnable(const int n) +{ + DebugEventLoopBacktrace = n; +} + +void DebugCurlEnable(const int n) +{ + DebugCurl = n; +} + +const char *WLog = "WX"; CTDL_MODULE_INIT(event_client) { -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT if (!threading) { + if (pthread_key_create(&evConKey, NULL) != 0) { + syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno)); + } + pthread_setspecific(evConKey, WLog); + + CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop); + CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace); + CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl); InitEventQueue(); - CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL); -/// todo register shutdown callback. + DBInitEventQueue(); + CtdlThreadCreate(client_event_thread); + CtdlThreadCreate(db_event_thread); } -#endif return "event"; }