From 2f6ccb7ba59f1aa4de0170ae2c8687cbfad15ce0 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 15 May 2011 14:12:15 +0000 Subject: [PATCH] sending http requests instant now. --- .../modules/eventclient/serv_eventclient.c | 295 ++++++++++++++++++ citadel/modules/extnotify/extnotify.h | 2 +- citadel/modules/extnotify/extnotify_main.c | 1 + citadel/modules/extnotify/funambol65.c | 6 +- citadel/sysdep.c | 15 +- 5 files changed, 308 insertions(+), 11 deletions(-) diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 8f05df409..7b6a8c3dd 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -46,6 +46,8 @@ #include #include #include +#include +#include #include "citadel.h" #include "server.h" #include "citserver.h" @@ -56,6 +58,7 @@ #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT #include "event_client.h" +#include "serv_curl.h" int event_add_pipe[2] = {-1, -1}; @@ -69,6 +72,292 @@ struct ev_loop *event_base; ev_async AddJob; ev_async ExitEventLoop; +ev_async WakeupCurl; + +extern struct ev_loop *event_base; + +void SockStateCb(void *data, int sock, int read, int write); + +#define MOPT(s, v) \ + do { \ + sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ + if (sta) { \ + CtdlLogPrintf(CTDL_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \ + exit (1); \ + } \ + } while (0) + + + +/***************************************************************************** + * libevent / curl integration * + *****************************************************************************/ + + +evcurl_global_data global; + +static void +gotstatus(evcurl_global_data *global, int nnrun) +{ + CURLM *mhnd; + CURLMsg *msg; + int nmsg; +/* + if (EVCURL_GLOBAL_MAGIC != global.magic) + { + CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct"); + return; + } +*/ + global->nrun = nnrun; + mhnd = global->mhnd; + + CtdlLogPrintf(CTDL_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n"); + while ((msg = curl_multi_info_read(mhnd, &nmsg))) { + CtdlLogPrintf(CTDL_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg); + if (CURLMSG_DONE == msg->msg) { + CtdlLogPrintf(CTDL_ERR, "EVCURL: request complete\n"); + CURL *chnd = msg->easy_handle; + char *chandle = NULL;; + CURLcode sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle); + if (sta) + CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta)); + evcurl_request_data *handle = (void *)chandle; + if (global != handle->global || chnd != handle->chnd) + CtdlLogPrintf(CTDL_ERR, "EVCURL: internal evcurl error: unknown curl handle completed\n"); + sta = msg->data.result; + if (sta) { + CtdlLogPrintf(CTDL_ERR, "EVCURL: error description: %s\n", handle->errdesc); + CtdlLogPrintf(CTDL_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta)); + } + long httpcode; + sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &httpcode); + if (sta) + CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta)); + CtdlLogPrintf(CTDL_ERR, "EVCURL: http response code was %ld\n", (long)httpcode); + CURLMcode msta = curl_multi_remove_handle(mhnd, chnd); + if (msta) + CtdlLogPrintf(CTDL_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta)); + handle->attached = 0; + } + } +} + +static void +stepmulti(evcurl_global_data *global, curl_socket_t fd) { + int nnrun; + CURLMcode msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun); + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n"); + if (msta) + CtdlLogPrintf(CTDL_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta)); + if (global->nrun != nnrun) + gotstatus(global, nnrun); +} + +static void +gottime(struct ev_loop *loop, ev_timer *timeev, int events) { + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for timeout\n"); + evcurl_global_data *global = (void *)timeev->data; + stepmulti(global, CURL_SOCKET_TIMEOUT); +} + +static void +gotio(struct ev_loop *loop, ev_io *ioev, int events) { + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd); + sockwatcher_data *sockwatcher = (void *)ioev->data; + stepmulti(sockwatcher->global, ioev->fd); +} + +static size_t +gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { + evcurl_request_data *D = (evcurl_request_data*) data; + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n"); + return CurlFillStrBuf_callback(D->ReplyData, size, nmemb, cglobal); +} + +static int +gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) { + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); + evcurl_global_data *global = cglobal; + ev_timer_stop(EV_DEFAULT, &global->timeev); + if (tblock_ms < 0 || 14000 < tblock_ms) + tblock_ms = 14000; + ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0); + ev_timer_start(EV_DEFAULT_UC, &global->timeev); + curl_multi_perform(global, CURL_POLL_NONE); + return 0; +} + +static int +gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) { + evcurl_global_data *global = cglobal; + CURLM *mhnd = global->mhnd; + sockwatcher_data *sockwatcher = csockwatcher; + + CtdlLogPrintf(CTDL_DEBUG,"EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); + + if (!sockwatcher) { + CtdlLogPrintf(CTDL_ERR,"EVCURL: called first time to register this sockwatcker\n"); + sockwatcher = malloc(sizeof(sockwatcher_data)); + sockwatcher->global = global; + ev_init(&sockwatcher->ioev, &gotio); + sockwatcher->ioev.data = (void *)sockwatcher; + curl_multi_assign(mhnd, fd, sockwatcher); + } + if (CURL_POLL_REMOVE == action) { + CtdlLogPrintf(CTDL_ERR,"EVCURL: called last time to unregister this sockwatcher\n"); + free(sockwatcher); + } else { + int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0); + ev_io_stop(EV_DEFAULT, &sockwatcher->ioev); + if (events) { + ev_io_set(&sockwatcher->ioev, fd, events); + ev_io_start(EV_DEFAULT, &sockwatcher->ioev); + } + } + return 0; +} + +void curl_init_connectionpool(void) +{ + CURLM *mhnd ; +// global.magic = EVCURL_GLOBAL_MAGIC; + + ev_timer_init(&global.timeev, &gottime, 14.0, 14.0); + global.timeev.data = (void *)&global; + global.nrun = -1; + CURLcode sta = curl_global_init(CURL_GLOBAL_ALL); + + if (sta) + { + CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta)); + exit(1); + } +/* + if (!ev_default_loop(EVFLAG_AUTO)) + { + CtdlLogPrintf(CTDL_ERR,"error initializing libev\n"); + exit(2); + } +*/ + mhnd = global.mhnd = curl_multi_init(); + if (!mhnd) + { + CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl multi handle\n"); + exit(3); + } + + MOPT(SOCKETFUNCTION, &gotwatchsock); + MOPT(SOCKETDATA, (void *)&global); + MOPT(TIMERFUNCTION, &gotwatchtime); + MOPT(TIMERDATA, (void *)&global); + + /* well, just there to fire the sample request?*/ +/// ev_timer_start(EV_DEFAULT, &global.timeev); + return; +} + + + + +int evcurl_init(evcurl_request_data *handle, + void *CustomData, + const char* Desc, + int CallBack) +{ + CURLcode sta; + CURL *chnd; + + CtdlLogPrintf(CTDL_DEBUG,"EVCURL: evcurl_init called ms\n"); + handle->global = &global; + handle->attached = 0; + chnd = handle->chnd = curl_easy_init(); + if (!chnd) + { + CtdlLogPrintf(CTDL_ERR, "EVCURL: error initializing curl handle\n"); + return 1; + } + + strcpy(handle->errdesc, Desc); + + OPT(VERBOSE, (long)1); + /* unset in production */ + OPT(NOPROGRESS, (long)1); + OPT(NOSIGNAL, (long)1); + OPT(FAILONERROR, (long)1); + OPT(ENCODING, ""); + OPT(FOLLOWLOCATION, (long)1); + OPT(MAXREDIRS, (long)7); + OPT(USERAGENT, CITADEL); + + OPT(TIMEOUT, (long)1800); + OPT(LOW_SPEED_LIMIT, (long)64); + OPT(LOW_SPEED_TIME, (long)600); + OPT(CONNECTTIMEOUT, (long)600); + OPT(PRIVATE, (void *)handle); + + + OPT(WRITEFUNCTION, &gotdata); + OPT(WRITEDATA, (void *)handle); + OPT(ERRORBUFFER, handle->errdesc); + + /* point to a structure that points back to the perl structure and stuff */ + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: Loading URL: %s\n", handle->URL->PlainUrl); + OPT(URL, handle->URL->PlainUrl); + if (StrLength(handle->URL->CurlCreds)) + { + OPT(HTTPAUTH, (long)CURLAUTH_BASIC); + OPT(USERPWD, ChrPtr(handle->URL->CurlCreds)); + } +#ifdef CURLOPT_HTTP_CONTENT_DECODING + OPT(HTTP_CONTENT_DECODING, 1); + OPT(ENCODING, ""); +#endif + if (StrLength(handle->PostData) > 0) + { + OPT(POSTFIELDS, ChrPtr(handle->PostData)); + OPT(POSTFIELDSIZE, StrLength(handle->PostData)); + + } + else if ((handle->PlainPostDataLen != 0) && (handle->PlainPostData != NULL)) + { + OPT(POSTFIELDS, handle->PlainPostData); + OPT(POSTFIELDSIZE, handle->PlainPostDataLen); + } + + if (handle->headers != NULL) + OPT(HTTPHEADER, handle->headers); + + return 1; +} + +void +evcurl_handle_start(evcurl_request_data *handle) +{ + CURLMcode msta; + + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: attaching to curl multi handle\n"); + msta = curl_multi_add_handle(handle->global->mhnd, handle->chnd); + if (msta) + CtdlLogPrintf(CTDL_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta)); + handle->attached = 1; +// ev_timer_start(EV_DEFAULT, &global.timeev); + ev_async_send (event_base, &WakeupCurl); +} + +static void WakeupCurlCallback(EV_P_ ev_async *w, int revents) +{ +/// evcurl_global_data *global = cglobal; + + CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl multi handle\n"); + + curl_multi_perform(&global, CURL_POLL_NONE); +} + +/***************************************************************************** + * libevent integration * + *****************************************************************************/ + static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { @@ -150,8 +439,14 @@ void *client_event_thread(void *arg) ev_async_start(event_base, &AddJob); ev_async_init(&ExitEventLoop, EventExitCallback); ev_async_start(event_base, &ExitEventLoop); + ev_async_init(&WakeupCurl, WakeupCurlCallback); + ev_async_start(event_base, &WakeupCurl); + + curl_init_connectionpool(); ev_loop (event_base, 0); + + CtdlClearSystemContext(); ev_default_destroy (); diff --git a/citadel/modules/extnotify/extnotify.h b/citadel/modules/extnotify/extnotify.h index 645f458f4..93a20dbcb 100644 --- a/citadel/modules/extnotify/extnotify.h +++ b/citadel/modules/extnotify/extnotify.h @@ -18,7 +18,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "../curl/serv_curl.h" +#include "../eventclient/serv_curl.h" #define FUNAMBOL_CONFIG_TEXT "funambol" #define PAGER_CONFIG_MESSAGE "__ Push email settings __" diff --git a/citadel/modules/extnotify/extnotify_main.c b/citadel/modules/extnotify/extnotify_main.c index 31f2f0932..7432a0ee2 100644 --- a/citadel/modules/extnotify/extnotify_main.c +++ b/citadel/modules/extnotify/extnotify_main.c @@ -63,6 +63,7 @@ #include "internet_addressing.h" #include "domain.h" #include "clientsocket.h" +#include "event_client.h" #include "extnotify.h" #include "ctdl_module.h" diff --git a/citadel/modules/extnotify/funambol65.c b/citadel/modules/extnotify/funambol65.c index 2e2276df1..7f63c6582 100644 --- a/citadel/modules/extnotify/funambol65.c +++ b/citadel/modules/extnotify/funambol65.c @@ -44,6 +44,7 @@ #include "msgbase.h" #include "ctdl_module.h" +#include "event_client.h" #include "extnotify.h" /* @@ -66,7 +67,8 @@ int notify_http_server(char *remoteurl, StrBuf *ReplyBuf; CURL *chnd; - return 0; + snprintf(msgnumstr, 128, "%ld", MsgNum); + if (tlen > 0) { /* Load the template message. Get mallocs done too */ FILE *Ftemplate = NULL; @@ -86,8 +88,6 @@ int notify_http_server(char *remoteurl, } mimetype = GuessMimeByFilename(template, tlen); - snprintf(msgnumstr, 128, "%ld", MsgNum); - buf = malloc(SIZ); memset(buf, 0, SIZ); SOAPMessage = malloc(3072); diff --git a/citadel/sysdep.c b/citadel/sysdep.c index 8715bc680..74330fe12 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -527,13 +527,14 @@ int client_write(const char *buf, int nbytes) snprintf(fn, SIZ, "/tmp/foolog_%s.%d", Ctx->ServiceName, Ctx->cs_pid); fd = fopen(fn, "a+"); - fprintf(fd, "Sending: BufSize: %d BufContent: [", - nbytes); - rv = fwrite(buf, nbytes, 1, fd); - fprintf(fd, "]\n"); - - - fclose(fd); + if (fd) + { + fprintf(fd, "Sending: BufSize: %d BufContent: [", + nbytes); + rv = fwrite(buf, nbytes, 1, fd); + fprintf(fd, "]\n"); + fclose(fd); + } } #endif // flush_client_inbuf(); -- 2.30.2