+++ /dev/null
-/*
- * Copyright (c) 1998-2015 by the citadel.org team
- *
- * This program is open source software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 3.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- */
-
-#include "sysdep.h"
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <termios.h>
-#include <fcntl.h>
-#include <signal.h>
-#include <pwd.h>
-#include <errno.h>
-#include <sys/types.h>
-#include <syslog.h>
-
-#if TIME_WITH_SYS_TIME
-# include <sys/time.h>
-# include <time.h>
-#else
-# if HAVE_SYS_TIME_H
-# include <sys/time.h>
-# else
-# include <time.h>
-# endif
-#endif
-#include <sys/wait.h>
-#include <ctype.h>
-#include <string.h>
-#include <limits.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <assert.h>
-#include <arpa/inet.h>
-#include <libcitadel.h>
-#include <curl/curl.h>
-#include <curl/multi.h>
-#include "citadel.h"
-#include "server.h"
-#include "citserver.h"
-#include "support.h"
-#include "ctdl_module.h"
-#include "config.h"
-#include "event_client.h"
-#include "serv_curl.h"
-
-ev_loop *event_base;
-int DebugEventLoop = 0;
-int DebugEventLoopBacktrace = 0;
-int DebugCurl = 0;
-pthread_key_t evConKey;
-
-long EvIDSource = 1;
-/*****************************************************************************
- * libevent / curl integration *
- *****************************************************************************/
-
-#define MOPT(s, v) \
- do { \
- sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \
- if (sta) { \
- syslog(LOG_ERR, "error setting option " \
- #s " on curl multi handle: %s\n", \
- curl_easy_strerror(sta)); \
- exit (1); \
- } \
- } while (0)
-
-
-typedef struct _evcurl_global_data {
- int magic;
- CURLM *mhnd;
- ev_timer timeev;
- int nrun;
-} evcurl_global_data;
-
-ev_async WakeupCurl;
-evcurl_global_data global;
-
-eNextState QueueAnDBOperation(AsyncIO *IO);
-
-static void
-gotstatus(int nnrun)
-{
- CURLMsg *msg;
- int nmsg;
-
- global.nrun = nnrun;
-
- syslog(LOG_DEBUG,
- "gotstatus(): about to call curl_multi_info_read\n");
- while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
- syslog(LOG_DEBUG,
- "got curl multi_info message msg=%d\n",
- msg->msg);
-
- if (CURLMSG_DONE == msg->msg) {
- CURL *chnd;
- void *chandle = NULL;
- CURLcode sta;
- CURLMcode msta;
- AsyncIO*IO;
-
- chandle = NULL;;
- chnd = msg->easy_handle;
- sta = curl_easy_getinfo(chnd,
- CURLINFO_PRIVATE,
- &chandle);
- if (sta) {
- syslog(LOG_ERR,
- "error asking curl for private"
- " cookie of curl handle: %s\n",
- curl_easy_strerror(sta));
- continue;
- }
- IO = (AsyncIO *)chandle;
- if (IO->ID == 0) {
- syslog(LOG_ERR,
- "Error, invalid IO context %p\n",
- IO);
- continue;
- }
- SetEVState(IO, eCurlGotStatus);
-
- syslog(LOG_DEBUG, "request complete\n");
-
- IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
-
- ev_io_stop(event_base, &IO->recv_event);
- ev_io_stop(event_base, &IO->send_event);
-
- sta = msg->data.result;
- if (sta) {
- syslog(LOG_ERR,
- "error description: %s\n",
- IO->HttpReq.errdesc);
- IO->HttpReq.CurlError = curl_easy_strerror(sta);
- syslog(LOG_ERR,
- "error performing request: %s\n",
- IO->HttpReq.CurlError);
- if (sta == CURLE_OPERATION_TIMEDOUT)
- {
- IO->SendBuf.fd = 0;
- IO->RecvBuf.fd = 0;
- }
- }
- sta = curl_easy_getinfo(chnd,
- CURLINFO_RESPONSE_CODE,
- &IO->HttpReq.httpcode);
- if (sta)
- syslog(LOG_ERR,
- "error asking curl for "
- "response code from request: %s\n",
- curl_easy_strerror(sta));
- syslog(LOG_DEBUG,
- "http response code was %ld\n",
- (long)IO->HttpReq.httpcode);
-
-
- curl_slist_free_all(IO->HttpReq.headers);
- IO->HttpReq.headers = NULL;
- msta = curl_multi_remove_handle(global.mhnd, chnd);
- if (msta)
- syslog(LOG_ERR,
- "warning problem detaching "
- "completed handle from curl multi: "
- "%s\n",
- curl_multi_strerror(msta));
-
- ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
-
- IO->HttpReq.attached = 0;
- switch(IO->SendDone(IO))
- {
- case eDBQuery:
- FreeURL(&IO->ConnectMe);
- QueueAnDBOperation(IO);
- break;
- case eSendDNSQuery:
- case eReadDNSReply:
- case eConnect:
- case eSendReply:
- case eSendMore:
- case eSendFile:
- case eReadMessage:
- case eReadMore:
- case eReadPayload:
- case eReadFile:
- break;
- case eTerminateConnection:
- case eAbort:
- curl_easy_cleanup(IO->HttpReq.chnd);
- IO->HttpReq.chnd = NULL;
- FreeStrBuf(&IO->HttpReq.ReplyData);
- FreeURL(&IO->ConnectMe);
- RemoveContext(IO->CitContext);
- IO->Terminate(IO);
- }
- }
- }
-}
-
-static void
-stepmulti(void *data, curl_socket_t fd, int which)
-{
- int running_handles = 0;
- CURLMcode msta;
-
- msta = curl_multi_socket_action(global.mhnd,
- fd,
- which,
- &running_handles);
-
- syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
- if (msta)
- syslog(LOG_ERR,
- "error in curl processing events"
- "on multi handle, fd %d: %s\n",
- (int)fd,
- curl_multi_strerror(msta));
-
- if (global.nrun != running_handles)
- gotstatus(running_handles);
-}
-
-static void
-gottime(struct ev_loop *loop, ev_timer *timeev, int events)
-{
- syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
- stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
-}
-
-static void
-got_in(struct ev_loop *loop, ev_io *ioev, int events)
-{
- syslog(LOG_DEBUG,
- "EVCURL: waking up curl for io on fd %d\n",
- (int)ioev->fd);
-
- stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
-}
-
-static void
-got_out(struct ev_loop *loop, ev_io *ioev, int events)
-{
- syslog(LOG_DEBUG,
- "waking up curl for io on fd %d\n",
- (int)ioev->fd);
-
- stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
-}
-
-static size_t
-gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
-{
- AsyncIO *IO = (AsyncIO*) cglobal;
-
- SetEVState(IO, eCurlGotData);
- if (IO->HttpReq.ReplyData == NULL)
- {
- IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
- }
- IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
- return CurlFillStrBuf_callback(data,
- size,
- nmemb,
- IO->HttpReq.ReplyData);
-}
-
-static int
-gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
- syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
- evcurl_global_data *global = cglobal;
- ev_timer_stop(EV_DEFAULT, &global->timeev);
- if (tblock_ms < 0 || 14000 < tblock_ms)
- tblock_ms = 14000;
- ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
- ev_timer_start(EV_DEFAULT_UC, &global->timeev);
- curl_multi_perform(global, &global->nrun);
- return 0;
-}
-
-static int
-gotwatchsock(CURL *easy,
- curl_socket_t fd,
- int action,
- void *cglobal,
- void *vIO)
-{
- evcurl_global_data *global = cglobal;
- CURLM *mhnd = global->mhnd;
- void *f;
- AsyncIO *IO = (AsyncIO*) vIO;
- CURLcode sta;
- const char *Action;
-
- if (IO == NULL) {
- sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
- if (sta) {
- syslog(LOG_ERR,
- "EVCURL: error asking curl for private "
- "cookie of curl handle: %s\n",
- curl_easy_strerror(sta));
- return -1;
- }
- IO = (AsyncIO *) f;
- SetEVState(IO, eCurlNewIO);
- syslog(LOG_DEBUG,
- "EVCURL: got socket for URL: %s\n",
- IO->ConnectMe->PlainUrl);
-
- if (IO->SendBuf.fd != 0)
- {
- ev_io_stop(event_base, &IO->recv_event);
- ev_io_stop(event_base, &IO->send_event);
- }
- IO->SendBuf.fd = fd;
- ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
- ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
- curl_multi_assign(mhnd, fd, IO);
- }
-
- SetEVState(IO, eCurlGotIO);
- IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
-
- Action = "";
- switch (action)
- {
- case CURL_POLL_NONE:
- Action = "CURL_POLL_NONE";
- break;
- case CURL_POLL_REMOVE:
- Action = "CURL_POLL_REMOVE";
- break;
- case CURL_POLL_IN:
- Action = "CURL_POLL_IN";
- break;
- case CURL_POLL_OUT:
- Action = "CURL_POLL_OUT";
- break;
- case CURL_POLL_INOUT:
- Action = "CURL_POLL_INOUT";
- break;
- }
-
-
- syslog(LOG_DEBUG,
- "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
- (int)fd, Action, action);
-
- switch (action)
- {
- case CURL_POLL_NONE:
- syslog(LOG_DEBUG,
- "called first time "
- "to register this sockwatcker\n");
- break;
- case CURL_POLL_REMOVE:
- syslog(LOG_DEBUG,
- "called last time to unregister "
- "this sockwatcher\n");
- ev_io_stop(event_base, &IO->recv_event);
- ev_io_stop(event_base, &IO->send_event);
- break;
- case CURL_POLL_IN:
- ev_io_start(event_base, &IO->recv_event);
- ev_io_stop(event_base, &IO->send_event);
- break;
- case CURL_POLL_OUT:
- ev_io_start(event_base, &IO->send_event);
- ev_io_stop(event_base, &IO->recv_event);
- break;
- case CURL_POLL_INOUT:
- ev_io_start(event_base, &IO->send_event);
- ev_io_start(event_base, &IO->recv_event);
- break;
- }
- return 0;
-}
-
-void curl_init_connectionpool(void)
-{
- CURLM *mhnd ;
-
- ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
- global.timeev.data = (void *)&global;
- global.nrun = -1;
- CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
-
- if (sta)
- {
- syslog(LOG_ERR,
- "error initializing curl library: %s\n",
- curl_easy_strerror(sta));
-
- exit(1);
- }
- mhnd = global.mhnd = curl_multi_init();
- if (!mhnd)
- {
- syslog(LOG_ERR,
- "error initializing curl multi handle\n");
- exit(3);
- }
-
- MOPT(SOCKETFUNCTION, &gotwatchsock);
- MOPT(SOCKETDATA, (void *)&global);
- MOPT(TIMERFUNCTION, &gotwatchtime);
- MOPT(TIMERDATA, (void *)&global);
-
- return;
-}
-
-int evcurl_init(AsyncIO *IO)
-{
- CURLcode sta;
- CURL *chnd;
-
- syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
- IO->HttpReq.attached = 0;
- chnd = IO->HttpReq.chnd = curl_easy_init();
- if (!chnd)
- {
- syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
- return 0;
- }
-
-#if DEBUG
- OPT(VERBOSE, (long)1);
-#endif
- OPT(NOPROGRESS, 1L);
-
- OPT(NOSIGNAL, 1L);
- OPT(FAILONERROR, (long)1);
- OPT(ENCODING, "");
- OPT(FOLLOWLOCATION, (long)0);
- OPT(MAXREDIRS, (long)0);
- OPT(USERAGENT, CITADEL);
-
- OPT(TIMEOUT, (long)1800);
- OPT(LOW_SPEED_LIMIT, (long)64);
- OPT(LOW_SPEED_TIME, (long)600);
- OPT(CONNECTTIMEOUT, (long)600);
- OPT(PRIVATE, (void *)IO);
-
- OPT(FORBID_REUSE, 1);
- OPT(WRITEFUNCTION, &gotdata);
- OPT(WRITEDATA, (void *)IO);
- OPT(ERRORBUFFER, IO->HttpReq.errdesc);
-
- if ((!IsEmptyStr(CtdlGetConfigStr("c_ip_addr")))
- && (strcmp(CtdlGetConfigStr("c_ip_addr"), "*"))
- && (strcmp(CtdlGetConfigStr("c_ip_addr"), "::"))
- && (strcmp(CtdlGetConfigStr("c_ip_addr"), "0.0.0.0"))
- )
- {
- OPT(INTERFACE, CtdlGetConfigStr("c_ip_addr"));
- }
-
-#ifdef CURLOPT_HTTP_CONTENT_DECODING
- OPT(HTTP_CONTENT_DECODING, 1);
- OPT(ENCODING, "");
-#endif
-
- IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
- "Connection: close");
-
- return 1;
-}
-
-
-static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
- ev_cleanup *watcher,
- int revents)
-{
- CURLMcode msta;
- AsyncIO *IO = watcher->data;
-
- if (IO == NULL)
- return;
-
- SetEVState(IO, eCurlShutdown);
- IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
- syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
-
- curl_slist_free_all(IO->HttpReq.headers);
- IO->HttpReq.headers = NULL;
- msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
- if (msta)
- {
- syslog(LOG_ERR,
- "EVCURL: warning problem detaching completed handle "
- "from curl multi: %s\n",
- curl_multi_strerror(msta));
- }
-
- curl_easy_cleanup(IO->HttpReq.chnd);
- IO->HttpReq.chnd = NULL;
- ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
- ev_io_stop(event_base, &IO->recv_event);
- ev_io_stop(event_base, &IO->send_event);
- assert(IO->ShutdownAbort);
- IO->ShutdownAbort(IO);
-}
-
-eNextState
-evcurl_handle_start(AsyncIO *IO)
-{
- CURLMcode msta;
- CURLcode sta;
- CURL *chnd;
-
- SetEVState(IO, eCurlStart);
- chnd = IO->HttpReq.chnd;
- syslog(LOG_DEBUG,
- "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
- OPT(URL, IO->ConnectMe->PlainUrl);
- if (StrLength(IO->ConnectMe->CurlCreds))
- {
- OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
- OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
- }
- if (StrLength(IO->HttpReq.PostData) > 0)
- {
- OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
- OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
-
- }
- else if ((IO->HttpReq.PlainPostDataLen != 0) &&
- (IO->HttpReq.PlainPostData != NULL))
- {
- OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
- OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
- }
- OPT(HTTPHEADER, IO->HttpReq.headers);
-
- IO->NextState = eConnect;
- syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
- msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
- if (msta)
- {
- syslog(LOG_ERR,
- "EVCURL: error attaching to curl multi handle: %s\n",
- curl_multi_strerror(msta));
- }
-
- IO->HttpReq.attached = 1;
- ev_async_send (event_base, &WakeupCurl);
-
- ev_cleanup_init(&IO->abort_by_shutdown,
- IOcurl_abort_shutdown_callback);
-
- ev_cleanup_start(event_base, &IO->abort_by_shutdown);
-
- return eReadMessage;
-}
-
-static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
-{
- syslog(LOG_DEBUG, "waking up curl multi handle\n");
-
- curl_multi_perform(&global, CURL_POLL_NONE);
-}
-
-static void evcurl_shutdown (void)
-{
- curl_global_cleanup();
- curl_multi_cleanup(global.mhnd);
- syslog(LOG_DEBUG, "exiting\n");
-}
-/*****************************************************************************
- * libevent integration *
- *****************************************************************************/
-/*
- * client event queue plus its methods.
- * this currently is the main loop (which may change in some future?)
- */
-int evbase_count = 0;
-pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
-pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
-HashList *QueueEvents = NULL;
-HashList *InboundEventQueue = NULL;
-HashList *InboundEventQueues[2] = { NULL, NULL };
-extern void ShutDownCLient(AsyncIO *IO);
-
-ev_async AddJob;
-ev_async ExitEventLoop;
-
-static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
-{
- CitContext *Ctx;
- long IOID = -1;
- long count = 0;
- ev_tstamp Now;
- HashList *q;
- void *v;
- HashPos*It;
- long len;
- const char *Key;
-
- /* get the control command... */
- pthread_mutex_lock(&EventQueueMutex);
-
- if (InboundEventQueues[0] == InboundEventQueue) {
- InboundEventQueue = InboundEventQueues[1];
- q = InboundEventQueues[0];
- }
- else {
- InboundEventQueue = InboundEventQueues[0];
- q = InboundEventQueues[1];
- }
- pthread_mutex_unlock(&EventQueueMutex);
- Now = ev_now (event_base);
- It = GetNewHashPos(q, 0);
- while (GetNextHashPos(q, It, &len, &Key, &v))
- {
- IOAddHandler *h = v;
- count ++;
- if (h->IO->ID == 0) {
- h->IO->ID = EvIDSource++;
- }
- IOID = h->IO->ID;
- if (h->IO->StartIO == 0.0)
- h->IO->StartIO = Now;
-
- SetEVState(h->IO, eIOAttach);
-
- Ctx = h->IO->CitContext;
- become_session(Ctx);
-
- h->IO->CitContext->lastcmd = h->IO->Now = Now;
- switch (h->EvAttch(h->IO))
- {
- case eReadMore:
- case eReadMessage:
- case eReadFile:
- case eSendReply:
- case eSendMore:
- case eReadPayload:
- case eSendFile:
- case eDBQuery:
- case eSendDNSQuery:
- case eReadDNSReply:
- case eConnect:
- break;
- case eTerminateConnection:
- case eAbort:
- ShutDownCLient(h->IO);
- break;
- }
- }
- DeleteHashPos(&It);
- DeleteHashContent(&q);
- syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld done.", IOSTR, IOID, count);
-}
-
-
-static void EventExitCallback(EV_P_ ev_async *w, int revents)
-{
- ev_break(event_base, EVBREAK_ALL);
-
- syslog(LOG_DEBUG, "EVENT Q exiting.\n");
-}
-
-
-
-void InitEventQueue(void)
-{
- pthread_mutex_init(&EventQueueMutex, NULL);
- pthread_mutex_init(&EventExitQueueMutex, NULL);
-
- QueueEvents = NewHash(1, Flathash);
- InboundEventQueues[0] = NewHash(1, Flathash);
- InboundEventQueues[1] = NewHash(1, Flathash);
- InboundEventQueue = InboundEventQueues[0];
-}
-extern void CtdlDestroyEVCleanupHooks(void);
-
-extern int EVQShutDown;
-const char *IOLog = "IO";
-/*
- * this thread operates the select() etc. via libev.
- */
-void *client_event_thread(void *arg)
-{
- struct CitContext libev_client_CC;
-
- CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
-
- pthread_setspecific(evConKey, IOLog);
-
- syslog(LOG_DEBUG, "client_event_thread() initializing\n");
-
- event_base = ev_default_loop (EVFLAG_AUTO);
- ev_async_init(&AddJob, QueueEventAddCallback);
- ev_async_start(event_base, &AddJob);
- ev_async_init(&ExitEventLoop, EventExitCallback);
- ev_async_start(event_base, &ExitEventLoop);
- ev_async_init(&WakeupCurl, WakeupCurlCallback);
- ev_async_start(event_base, &WakeupCurl);
-
- curl_init_connectionpool();
-
- ev_run (event_base, 0);
-
- syslog(LOG_DEBUG, "client_event_thread() exiting\n");
-
-///what todo here? CtdlClearSystemContext();
- pthread_mutex_lock(&EventExitQueueMutex);
- ev_loop_destroy (EV_DEFAULT_UC);
- event_base = NULL;
- DeleteHash(&QueueEvents);
- InboundEventQueue = NULL;
- DeleteHash(&InboundEventQueues[0]);
- DeleteHash(&InboundEventQueues[1]);
-/* citthread_mutex_destroy(&EventQueueMutex); TODO */
- evcurl_shutdown();
-
- CtdlDestroyEVCleanupHooks();
-
- pthread_mutex_unlock(&EventExitQueueMutex);
- EVQShutDown = 1;
- return(NULL);
-}
-
-/*----------------------------------------------------------------------------*/
-/*
- * DB-Queue; does async bdb operations.
- * has its own set of handlers.
- */
-ev_loop *event_db;
-int evdb_count = 0;
-pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
-pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
-HashList *DBQueueEvents = NULL;
-HashList *DBInboundEventQueue = NULL;
-HashList *DBInboundEventQueues[2] = { NULL, NULL };
-
-ev_async DBAddJob;
-ev_async DBExitEventLoop;
-
-extern void ShutDownDBCLient(AsyncIO *IO);
-
-static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
-{
- CitContext *Ctx;
- long IOID = -1;
- long count = 0;;
- ev_tstamp Now;
- HashList *q;
- void *v;
- HashPos *It;
- long len;
- const char *Key;
-
- /* get the control command... */
- pthread_mutex_lock(&DBEventQueueMutex);
-
- if (DBInboundEventQueues[0] == DBInboundEventQueue) {
- DBInboundEventQueue = DBInboundEventQueues[1];
- q = DBInboundEventQueues[0];
- }
- else {
- DBInboundEventQueue = DBInboundEventQueues[0];
- q = DBInboundEventQueues[1];
- }
- pthread_mutex_unlock(&DBEventQueueMutex);
-
- Now = ev_now (event_db);
- It = GetNewHashPos(q, 0);
- while (GetNextHashPos(q, It, &len, &Key, &v))
- {
- IOAddHandler *h = v;
- eNextState rc;
- count ++;
- if (h->IO->ID == 0)
- h->IO->ID = EvIDSource++;
- IOID = h->IO->ID;
- if (h->IO->StartDB == 0.0)
- h->IO->StartDB = Now;
- h->IO->CitContext->lastcmd = h->IO->Now = Now;
-
- SetEVState(h->IO, eDBAttach);
- Ctx = h->IO->CitContext;
- become_session(Ctx);
- ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
- rc = h->EvAttch(h->IO);
- switch (rc)
- {
- case eAbort:
- ShutDownDBCLient(h->IO);
- default:
- break;
- }
- }
- DeleteHashPos(&It);
- DeleteHashContent(&q);
- syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
-}
-
-
-static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
-{
- syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
- ev_break(event_db, EVBREAK_ALL);
-}
-
-
-
-void DBInitEventQueue(void)
-{
- pthread_mutex_init(&DBEventQueueMutex, NULL);
- pthread_mutex_init(&DBEventExitQueueMutex, NULL);
-
- DBQueueEvents = NewHash(1, Flathash);
- DBInboundEventQueues[0] = NewHash(1, Flathash);
- DBInboundEventQueues[1] = NewHash(1, Flathash);
- DBInboundEventQueue = DBInboundEventQueues[0];
-}
-
-const char *DBLog = "BD";
-
-/*
- * this thread operates writing to the message database via libev.
- */
-void *db_event_thread(void *arg)
-{
- ev_loop *tmp;
- struct CitContext libev_msg_CC;
-
- pthread_setspecific(evConKey, DBLog);
-
- CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
-
- syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
-
- tmp = event_db = ev_loop_new (EVFLAG_AUTO);
-
- ev_async_init(&DBAddJob, DBQueueEventAddCallback);
- ev_async_start(event_db, &DBAddJob);
- ev_async_init(&DBExitEventLoop, DBEventExitCallback);
- ev_async_start(event_db, &DBExitEventLoop);
-
- ev_run (event_db, 0);
-
- pthread_mutex_lock(&DBEventExitQueueMutex);
-
- event_db = NULL;
- syslog(LOG_INFO, "dbevent_thread() exiting\n");
-
- DeleteHash(&DBQueueEvents);
- DBInboundEventQueue = NULL;
- DeleteHash(&DBInboundEventQueues[0]);
- DeleteHash(&DBInboundEventQueues[1]);
-
-/* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
-
- ev_loop_destroy (tmp);
- pthread_mutex_unlock(&DBEventExitQueueMutex);
- return(NULL);
-}
-
-void ShutDownEventQueues(void)
-{
- syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
-
- pthread_mutex_lock(&DBEventQueueMutex);
- ev_async_send (event_db, &DBExitEventLoop);
- pthread_mutex_unlock(&DBEventQueueMutex);
-
- pthread_mutex_lock(&EventQueueMutex);
- ev_async_send (EV_DEFAULT_ &ExitEventLoop);
- pthread_mutex_unlock(&EventQueueMutex);
-}
-
-void DebugEventloopEnable(const int n)
-{
- DebugEventLoop = n;
-}
-void DebugEventloopBacktraceEnable(const int n)
-{
- DebugEventLoopBacktrace = n;
-}
-
-void DebugCurlEnable(const int n)
-{
- DebugCurl = n;
-}
-
-const char *WLog = "WX";
-CTDL_MODULE_INIT(event_client)
-{
- if (!threading)
- {
- if (pthread_key_create(&evConKey, NULL) != 0) {
- syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
- }
- pthread_setspecific(evConKey, WLog);
-
- CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
- CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
- CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
- InitEventQueue();
- DBInitEventQueue();
- CtdlThreadCreate(client_event_thread);
- CtdlThreadCreate(db_event_thread);
- }
- return "event";
-}