+/*----------------------------------------------------------------------------*/
+/*
+ * DB-Queue; does async bdb operations.
+ * has its own set of handlers.
+ */
+ev_loop *event_db;
+int evdb_count = 0;
+int evdb_add_pipe[2] = {-1, -1};
+pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
+HashList *DBQueueEvents = NULL;
+HashList *DBInboundEventQueue = NULL;
+HashList *DBInboundEventQueues[2] = { NULL, NULL };
+
+ev_async DBAddJob;
+ev_async DBExitEventLoop;
+
+extern void ShutDownDBCLient(AsyncIO *IO);
+
+static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
+{
+ ev_tstamp Now;
+ HashList *q;
+ void *v;
+ HashPos *It;
+ long len;
+ const char *Key;
+
+ /* get the control command... */
+ pthread_mutex_lock(&DBEventQueueMutex);
+
+ if (DBInboundEventQueues[0] == DBInboundEventQueue) {
+ DBInboundEventQueue = DBInboundEventQueues[1];
+ q = DBInboundEventQueues[0];
+ }
+ else {
+ DBInboundEventQueue = DBInboundEventQueues[0];
+ q = DBInboundEventQueues[1];
+ }
+ pthread_mutex_unlock(&DBEventQueueMutex);
+
+ Now = ev_now (event_db);
+ It = GetNewHashPos(q, 0);
+ while (GetNextHashPos(q, It, &len, &Key, &v))
+ {
+ IOAddHandler *h = v;
+ eNextState rc;
+ if (h->IO->ID == 0)
+ h->IO->ID = EvIDSource++;
+ if (h->IO->StartDB == 0.0)
+ h->IO->StartDB = Now;
+ h->IO->Now = Now;
+ rc = h->EvAttch(h->IO);
+ switch (rc)
+ {
+ case eAbort:
+ ShutDownDBCLient(h->IO);
+ default:
+ break;
+ }
+ }
+ DeleteHashPos(&It);
+ DeleteHashContent(&q);
+ syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
+}
+
+
+static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
+{
+ syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
+ ev_break(event_db, EVBREAK_ALL);
+}
+
+
+
+void DBInitEventQueue(void)
+{
+ struct rlimit LimitSet;
+
+ pthread_mutex_init(&DBEventQueueMutex, NULL);
+
+ if (pipe(evdb_add_pipe) != 0) {
+ syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
+ abort();
+ }
+ LimitSet.rlim_cur = 1;
+ LimitSet.rlim_max = 1;
+ setrlimit(evdb_add_pipe[1], &LimitSet);
+
+ DBQueueEvents = NewHash(1, Flathash);
+ DBInboundEventQueues[0] = NewHash(1, Flathash);
+ DBInboundEventQueues[1] = NewHash(1, Flathash);
+ DBInboundEventQueue = DBInboundEventQueues[0];
+}
+
+/*
+ * this thread operates writing to the message database via libev.
+ */
+void *db_event_thread(void *arg)
+{
+ struct CitContext libev_msg_CC;
+
+ CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
+// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+ syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
+
+ event_db = ev_loop_new (EVFLAG_AUTO);
+
+ ev_async_init(&DBAddJob, DBQueueEventAddCallback);
+ ev_async_start(event_db, &DBAddJob);
+ ev_async_init(&DBExitEventLoop, DBEventExitCallback);
+ ev_async_start(event_db, &DBExitEventLoop);
+
+ ev_run (event_db, 0);
+
+ syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
+
+//// what to do here? CtdlClearSystemContext();
+ ev_loop_destroy (event_db);
+
+ DeleteHash(&DBQueueEvents);
+ DBInboundEventQueue = NULL;
+ DeleteHash(&DBInboundEventQueues[0]);
+ DeleteHash(&DBInboundEventQueues[1]);
+
+ close(evdb_add_pipe[0]);
+ close(evdb_add_pipe[1]);
+/* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
+
+ return(NULL);
+}
+
+void ShutDownEventQueues(void)
+{
+ syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
+
+ pthread_mutex_lock(&DBEventQueueMutex);
+ ev_async_send (event_db, &DBExitEventLoop);
+ pthread_mutex_unlock(&DBEventQueueMutex);
+
+ pthread_mutex_lock(&EventQueueMutex);
+ ev_async_send (EV_DEFAULT_ &ExitEventLoop);
+ pthread_mutex_unlock(&EventQueueMutex);
+}
+
+void DebugEventloopEnable(void)
+{
+ DebugEventLoop = 1;
+}
+
+void DebugCurlEnable(void)
+{
+ DebugCurl = 1;
+}