ce5d7f0412c12246e8663e6d2025b992cd6992f7
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63
64 long EvIDSource = 1;
65 /*****************************************************************************
66  *                   libevent / curl integration                             *
67  *****************************************************************************/
68 #define MOPT(s, v)\
69         do {                                                            \
70                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
71                 if (sta) {                                              \
72                         syslog(LOG_ERR, "EVCURL: error setting option " \
73                                #s " on curl multi handle: %s\n",        \
74                                curl_easy_strerror(sta));                \
75                         exit (1);                                       \
76                 }                                                       \
77         } while (0)
78
79 typedef struct _evcurl_global_data {
80         int magic;
81         CURLM *mhnd;
82         ev_timer timeev;
83         int nrun;
84 } evcurl_global_data;
85
86 ev_async WakeupCurl;
87 evcurl_global_data global;
88
89 static void
90 gotstatus(int nnrun)
91 {
92         CURLMsg *msg;
93         int nmsg;
94
95         global.nrun = nnrun;
96
97         syslog(LOG_DEBUG,
98                "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
99         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
100                 syslog(LOG_ERR,
101                        "EVCURL: got curl multi_info message msg=%d\n",
102                        msg->msg);
103
104                 if (CURLMSG_DONE == msg->msg) {
105                         CURL *chnd;
106                         char *chandle;
107                         CURLcode sta;
108                         CURLMcode msta;
109                         AsyncIO*IO;
110
111                         chandle = NULL;;
112                         chnd = msg->easy_handle;
113                         sta = curl_easy_getinfo(chnd,
114                                                 CURLINFO_PRIVATE,
115                                                 &chandle);
116                         syslog(LOG_ERR, "EVCURL: request complete\n");
117                         if (sta)
118                                 syslog(LOG_ERR,
119                                        "EVCURL: error asking curl for private"
120                                        " cookie of curl handle: %s\n",
121                                        curl_easy_strerror(sta));
122                         IO = (AsyncIO *)chandle;
123
124                         IO->Now = ev_now(event_base);
125
126                         ev_io_stop(event_base, &IO->recv_event);
127                         ev_io_stop(event_base, &IO->send_event);
128
129                         sta = msg->data.result;
130                         if (sta) {
131                                 EV_syslog(LOG_ERR,
132                                           "EVCURL: error description: %s\n",
133                                           IO->HttpReq.errdesc);
134                                 EV_syslog(LOG_ERR,
135                                           "EVCURL: error performing request: %s\n",
136                                           curl_easy_strerror(sta));
137                         }
138                         sta = curl_easy_getinfo(chnd,
139                                                 CURLINFO_RESPONSE_CODE,
140                                                 &IO->HttpReq.httpcode);
141                         if (sta)
142                                 EV_syslog(LOG_ERR,
143                                           "EVCURL: error asking curl for "
144                                           "response code from request: %s\n",
145                                           curl_easy_strerror(sta));
146                         EV_syslog(LOG_ERR,
147                                   "EVCURL: http response code was %ld\n",
148                                   (long)IO->HttpReq.httpcode);
149
150
151                         curl_slist_free_all(IO->HttpReq.headers);
152                         msta = curl_multi_remove_handle(global.mhnd, chnd);
153                         if (msta)
154                                 EV_syslog(LOG_ERR,
155                                           "EVCURL: warning problem detaching "
156                                           "completed handle from curl multi: "
157                                           "%s\n",
158                                           curl_multi_strerror(msta));
159
160                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
161
162                         IO->HttpReq.attached = 0;
163                         switch(IO->SendDone(IO))
164                         {
165                         case eDBQuery:
166                                 curl_easy_cleanup(IO->HttpReq.chnd);
167                                 IO->HttpReq.chnd = NULL;
168                                 break;
169                         case eSendDNSQuery:
170                         case eReadDNSReply:
171                         case eConnect:
172                         case eSendReply:
173                         case eSendMore:
174                         case eSendFile:
175                         case eReadMessage:
176                         case eReadMore:
177                         case eReadPayload:
178                         case eReadFile:
179                                 curl_easy_cleanup(IO->HttpReq.chnd);
180                                 IO->HttpReq.chnd = NULL;
181                                 break;
182                         case eTerminateConnection:
183                         case eAbort:
184                                 curl_easy_cleanup(IO->HttpReq.chnd);
185                                 IO->HttpReq.chnd = NULL;
186                                 FreeStrBuf(&IO->HttpReq.ReplyData);
187                                 FreeURL(&IO->ConnectMe);
188                                 RemoveContext(IO->CitContext);
189                                 IO->Terminate(IO);
190                         }
191                 }
192         }
193 }
194
195 static void
196 stepmulti(void *data, curl_socket_t fd, int which)
197 {
198         int running_handles = 0;
199         CURLMcode msta;
200
201         msta = curl_multi_socket_action(global.mhnd,
202                                         fd,
203                                         which,
204                                         &running_handles);
205
206         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
207         if (msta)
208                 syslog(LOG_ERR,
209                        "EVCURL: error in curl processing events"
210                        "on multi handle, fd %d: %s\n",
211                        (int)fd,
212                        curl_multi_strerror(msta));
213
214         if (global.nrun != running_handles)
215                 gotstatus(running_handles);
216 }
217
218 static void
219 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
220 {
221         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
222         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
223 }
224
225 static void
226 got_in(struct ev_loop *loop, ev_io *ioev, int events)
227 {
228         syslog(LOG_DEBUG,
229                "EVCURL: waking up curl for io on fd %d\n",
230                (int)ioev->fd);
231
232         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
233 }
234
235 static void
236 got_out(struct ev_loop *loop, ev_io *ioev, int events)
237 {
238         syslog(LOG_DEBUG,
239                "EVCURL: waking up curl for io on fd %d\n",
240                (int)ioev->fd);
241
242         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
243 }
244
245 static size_t
246 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
247         AsyncIO *IO = (AsyncIO*) cglobal;
248
249         if (IO->HttpReq.ReplyData == NULL)
250         {
251                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
252         }
253         IO->Now = ev_now(event_base);
254         return CurlFillStrBuf_callback(data,
255                                        size,
256                                        nmemb,
257                                        IO->HttpReq.ReplyData);
258 }
259
260 static int
261 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
262         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
263         evcurl_global_data *global = cglobal;
264         ev_timer_stop(EV_DEFAULT, &global->timeev);
265         if (tblock_ms < 0 || 14000 < tblock_ms)
266                 tblock_ms = 14000;
267         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
268         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
269         curl_multi_perform(global, &global->nrun);
270         return 0;
271 }
272
273 static int
274 gotwatchsock(CURL *easy,
275              curl_socket_t fd,
276              int action,
277              void *cglobal,
278              void *vIO)
279 {
280         evcurl_global_data *global = cglobal;
281         CURLM *mhnd = global->mhnd;
282         char *f;
283         AsyncIO *IO = (AsyncIO*) vIO;
284         CURLcode sta;
285         const char *Action;
286
287         if (IO == NULL) {
288                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
289                 if (sta) {
290                         EV_syslog(LOG_ERR,
291                                   "EVCURL: error asking curl for private "
292                                   "cookie of curl handle: %s\n",
293                                   curl_easy_strerror(sta));
294                         return -1;
295                 }
296                 IO = (AsyncIO *) f;
297                 EV_syslog(LOG_DEBUG,
298                           "EVCURL: got socket for URL: %s\n",
299                           IO->ConnectMe->PlainUrl);
300
301                 if (IO->SendBuf.fd != 0)
302                 {
303                         ev_io_stop(event_base, &IO->recv_event);
304                         ev_io_stop(event_base, &IO->send_event);
305                 }
306                 IO->SendBuf.fd = fd;
307                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
308                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
309                 curl_multi_assign(mhnd, fd, IO);
310         }
311
312         IO->Now = ev_now(event_base);
313
314         Action = "";
315         switch (action)
316         {
317         case CURL_POLL_NONE:
318                 Action = "CURL_POLL_NONE";
319                 break;
320         case CURL_POLL_REMOVE:
321                 Action = "CURL_POLL_REMOVE";
322                 break;
323         case CURL_POLL_IN:
324                 Action = "CURL_POLL_IN";
325                 break;
326         case CURL_POLL_OUT:
327                 Action = "CURL_POLL_OUT";
328                 break;
329         case CURL_POLL_INOUT:
330                 Action = "CURL_POLL_INOUT";
331                 break;
332         }
333
334
335         EV_syslog(LOG_DEBUG,
336                   "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
337                   (int)fd, Action, action);
338
339         switch (action)
340         {
341         case CURL_POLL_NONE:
342                 EVM_syslog(LOG_ERR,
343                            "EVCURL: called first time "
344                            "to register this sockwatcker\n");
345                 break;
346         case CURL_POLL_REMOVE:
347                 EVM_syslog(LOG_ERR,
348                            "EVCURL: called last time to unregister "
349                            "this sockwatcher\n");
350                 ev_io_stop(event_base, &IO->recv_event);
351                 ev_io_stop(event_base, &IO->send_event);
352                 break;
353         case CURL_POLL_IN:
354                 ev_io_start(event_base, &IO->recv_event);
355                 ev_io_stop(event_base, &IO->send_event);
356                 break;
357         case CURL_POLL_OUT:
358                 ev_io_start(event_base, &IO->send_event);
359                 ev_io_stop(event_base, &IO->recv_event);
360                 break;
361         case CURL_POLL_INOUT:
362                 ev_io_start(event_base, &IO->send_event);
363                 ev_io_start(event_base, &IO->recv_event);
364                 break;
365         }
366         return 0;
367 }
368
369 void curl_init_connectionpool(void)
370 {
371         CURLM *mhnd ;
372
373         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
374         global.timeev.data = (void *)&global;
375         global.nrun = -1;
376         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
377
378         if (sta)
379         {
380                 syslog(LOG_ERR,
381                        "EVCURL: error initializing curl library: %s\n",
382                        curl_easy_strerror(sta));
383
384                 exit(1);
385         }
386         mhnd = global.mhnd = curl_multi_init();
387         if (!mhnd)
388         {
389                 syslog(LOG_ERR,
390                        "EVCURL: error initializing curl multi handle\n");
391                 exit(3);
392         }
393
394         MOPT(SOCKETFUNCTION, &gotwatchsock);
395         MOPT(SOCKETDATA, (void *)&global);
396         MOPT(TIMERFUNCTION, &gotwatchtime);
397         MOPT(TIMERDATA, (void *)&global);
398
399         return;
400 }
401
402 int evcurl_init(AsyncIO *IO)
403 {
404         CURLcode sta;
405         CURL *chnd;
406
407         EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
408         IO->HttpReq.attached = 0;
409         chnd = IO->HttpReq.chnd = curl_easy_init();
410         if (!chnd)
411         {
412                 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
413                 return 0;
414         }
415
416 #if DEBUG
417         OPT(VERBOSE, (long)1);
418 #endif
419         OPT(NOPROGRESS, 1L);
420
421         OPT(NOSIGNAL, 1L);
422         OPT(FAILONERROR, (long)1);
423         OPT(ENCODING, "");
424         OPT(FOLLOWLOCATION, (long)0);
425         OPT(MAXREDIRS, (long)0);
426         OPT(USERAGENT, CITADEL);
427
428         OPT(TIMEOUT, (long)1800);
429         OPT(LOW_SPEED_LIMIT, (long)64);
430         OPT(LOW_SPEED_TIME, (long)600);
431         OPT(CONNECTTIMEOUT, (long)600);
432         OPT(PRIVATE, (void *)IO);
433
434         OPT(FORBID_REUSE, 1);
435         OPT(WRITEFUNCTION, &gotdata);
436         OPT(WRITEDATA, (void *)IO);
437         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
438
439         if ((!IsEmptyStr(config.c_ip_addr))
440                 && (strcmp(config.c_ip_addr, "*"))
441                 && (strcmp(config.c_ip_addr, "::"))
442                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
443                 )
444         {
445                 OPT(INTERFACE, config.c_ip_addr);
446         }
447
448 #ifdef CURLOPT_HTTP_CONTENT_DECODING
449         OPT(HTTP_CONTENT_DECODING, 1);
450         OPT(ENCODING, "");
451 #endif
452
453         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
454                                                 "Connection: close");
455
456         return 1;
457 }
458
459
460 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
461                                            ev_cleanup *watcher,
462                                            int revents)
463 {
464         CURLMcode msta;
465         AsyncIO *IO = watcher->data;
466         IO->Now = ev_now(event_base);
467         EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
468
469         curl_slist_free_all(IO->HttpReq.headers);
470         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
471         if (msta)
472         {
473                 EV_syslog(LOG_ERR,
474                           "EVCURL: warning problem detaching completed handle "
475                           "from curl multi: %s\n",
476                           curl_multi_strerror(msta));
477         }
478
479         curl_easy_cleanup(IO->HttpReq.chnd);
480         IO->HttpReq.chnd = NULL;
481         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
482         ev_io_stop(event_base, &IO->recv_event);
483         ev_io_stop(event_base, &IO->send_event);
484         assert(IO->ShutdownAbort);
485         IO->ShutdownAbort(IO);
486 }
487 eNextState
488 evcurl_handle_start(AsyncIO *IO)
489 {
490         CURLMcode msta;
491         CURLcode sta;
492         CURL *chnd;
493
494         chnd = IO->HttpReq.chnd;
495         EV_syslog(LOG_DEBUG,
496                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
497         OPT(URL, IO->ConnectMe->PlainUrl);
498         if (StrLength(IO->ConnectMe->CurlCreds))
499         {
500                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
501                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
502         }
503         if (StrLength(IO->HttpReq.PostData) > 0)
504         {
505                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
506                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
507
508         }
509         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
510                  (IO->HttpReq.PlainPostData != NULL))
511         {
512                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
513                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
514         }
515         OPT(HTTPHEADER, IO->HttpReq.headers);
516
517         IO->NextState = eConnect;
518         EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
519         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
520         if (msta)
521         {
522                 EV_syslog(LOG_ERR,
523                           "EVCURL: error attaching to curl multi handle: %s\n",
524                           curl_multi_strerror(msta));
525         }
526
527         IO->HttpReq.attached = 1;
528         ev_async_send (event_base, &WakeupCurl);
529         ev_cleanup_init(&IO->abort_by_shutdown,
530                         IOcurl_abort_shutdown_callback);
531
532         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
533         return eReadMessage;
534 }
535
536 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
537 {
538         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
539
540         curl_multi_perform(&global, CURL_POLL_NONE);
541 }
542
543 static void evcurl_shutdown (void)
544 {
545         curl_global_cleanup();
546         curl_multi_cleanup(global.mhnd);
547         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
548 }
549 /*****************************************************************************
550  *                       libevent integration                                *
551  *****************************************************************************/
552 /*
553  * client event queue plus its methods.
554  * this currently is the main loop (which may change in some future?)
555  */
556 int evbase_count = 0;
557 int event_add_pipe[2] = {-1, -1};
558 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
559 HashList *QueueEvents = NULL;
560 HashList *InboundEventQueue = NULL;
561 HashList *InboundEventQueues[2] = { NULL, NULL };
562
563 ev_async AddJob;
564 ev_async ExitEventLoop;
565
566 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
567 {
568         ev_tstamp Now;
569         HashList *q;
570         void *v;
571         HashPos*It;
572         long len;
573         const char *Key;
574
575         /* get the control command... */
576         pthread_mutex_lock(&EventQueueMutex);
577
578         if (InboundEventQueues[0] == InboundEventQueue) {
579                 InboundEventQueue = InboundEventQueues[1];
580                 q = InboundEventQueues[0];
581         }
582         else {
583                 InboundEventQueue = InboundEventQueues[0];
584                 q = InboundEventQueues[1];
585         }
586         pthread_mutex_unlock(&EventQueueMutex);
587         Now = ev_now (event_base);
588         It = GetNewHashPos(q, 0);
589         while (GetNextHashPos(q, It, &len, &Key, &v))
590         {
591                 IOAddHandler *h = v;
592                 if (h->IO->ID == 0) {
593                         h->IO->ID = EvIDSource++;
594                 }
595                 if (h->IO->StartIO == 0.0)
596                         h->IO->StartIO = Now;
597                 h->IO->Now = Now;
598                 h->EvAttch(h->IO);
599         }
600         DeleteHashPos(&It);
601         DeleteHashContent(&q);
602         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
603 }
604
605
606 static void EventExitCallback(EV_P_ ev_async *w, int revents)
607 {
608         ev_break(event_base, EVBREAK_ALL);
609
610         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
611 }
612
613
614
615 void InitEventQueue(void)
616 {
617         struct rlimit LimitSet;
618
619         pthread_mutex_init(&EventQueueMutex, NULL);
620
621         if (pipe(event_add_pipe) != 0) {
622                 syslog(LOG_EMERG,
623                        "Unable to create pipe for libev queueing: %s\n",
624                        strerror(errno));
625                 abort();
626         }
627         LimitSet.rlim_cur = 1;
628         LimitSet.rlim_max = 1;
629         setrlimit(event_add_pipe[1], &LimitSet);
630
631         QueueEvents = NewHash(1, Flathash);
632         InboundEventQueues[0] = NewHash(1, Flathash);
633         InboundEventQueues[1] = NewHash(1, Flathash);
634         InboundEventQueue = InboundEventQueues[0];
635 }
636 extern void CtdlDestroyEVCleanupHooks(void);
637
638 extern int EVQShutDown;
639 /*
640  * this thread operates the select() etc. via libev.
641  */
642 void *client_event_thread(void *arg) 
643 {
644         struct CitContext libev_client_CC;
645
646         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
647 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
648         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
649
650         event_base = ev_default_loop (EVFLAG_AUTO);
651         ev_async_init(&AddJob, QueueEventAddCallback);
652         ev_async_start(event_base, &AddJob);
653         ev_async_init(&ExitEventLoop, EventExitCallback);
654         ev_async_start(event_base, &ExitEventLoop);
655         ev_async_init(&WakeupCurl, WakeupCurlCallback);
656         ev_async_start(event_base, &WakeupCurl);
657
658         curl_init_connectionpool();
659
660         ev_run (event_base, 0);
661
662         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
663
664 ///what todo here?      CtdlClearSystemContext();
665         ev_loop_destroy (EV_DEFAULT_UC);
666         DeleteHash(&QueueEvents);
667         InboundEventQueue = NULL;
668         DeleteHash(&InboundEventQueues[0]);
669         DeleteHash(&InboundEventQueues[1]);
670 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
671         evcurl_shutdown();
672         close(event_add_pipe[0]);
673         close(event_add_pipe[1]);
674
675         CtdlDestroyEVCleanupHooks();
676
677         EVQShutDown = 1;        
678         return(NULL);
679 }
680
681 /*----------------------------------------------------------------------------*/
682 /*
683  * DB-Queue; does async bdb operations.
684  * has its own set of handlers.
685  */
686 ev_loop *event_db;
687 int evdb_count = 0;
688 int evdb_add_pipe[2] = {-1, -1};
689 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
690 HashList *DBQueueEvents = NULL;
691 HashList *DBInboundEventQueue = NULL;
692 HashList *DBInboundEventQueues[2] = { NULL, NULL };
693
694 ev_async DBAddJob;
695 ev_async DBExitEventLoop;
696
697 extern void ShutDownDBCLient(AsyncIO *IO);
698
699 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
700 {
701         ev_tstamp Now;
702         HashList *q;
703         void *v;
704         HashPos *It;
705         long len;
706         const char *Key;
707
708         /* get the control command... */
709         pthread_mutex_lock(&DBEventQueueMutex);
710
711         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
712                 DBInboundEventQueue = DBInboundEventQueues[1];
713                 q = DBInboundEventQueues[0];
714         }
715         else {
716                 DBInboundEventQueue = DBInboundEventQueues[0];
717                 q = DBInboundEventQueues[1];
718         }
719         pthread_mutex_unlock(&DBEventQueueMutex);
720
721         Now = ev_now (event_db);
722         It = GetNewHashPos(q, 0);
723         while (GetNextHashPos(q, It, &len, &Key, &v))
724         {
725                 IOAddHandler *h = v;
726                 eNextState rc;
727                 if (h->IO->ID == 0)
728                         h->IO->ID = EvIDSource++;
729                 if (h->IO->StartDB == 0.0)
730                         h->IO->StartDB = Now;
731                 h->IO->Now = Now;
732                 rc = h->EvAttch(h->IO);
733                 switch (rc)
734                 {
735                 case eAbort:
736                         ShutDownDBCLient(h->IO);
737                 default:
738                         break;
739                 }
740         }
741         DeleteHashPos(&It);
742         DeleteHashContent(&q);
743         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
744 }
745
746
747 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
748 {
749         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
750         ev_break(event_db, EVBREAK_ALL);
751 }
752
753
754
755 void DBInitEventQueue(void)
756 {
757         struct rlimit LimitSet;
758
759         pthread_mutex_init(&DBEventQueueMutex, NULL);
760
761         if (pipe(evdb_add_pipe) != 0) {
762                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
763                 abort();
764         }
765         LimitSet.rlim_cur = 1;
766         LimitSet.rlim_max = 1;
767         setrlimit(evdb_add_pipe[1], &LimitSet);
768
769         DBQueueEvents = NewHash(1, Flathash);
770         DBInboundEventQueues[0] = NewHash(1, Flathash);
771         DBInboundEventQueues[1] = NewHash(1, Flathash);
772         DBInboundEventQueue = DBInboundEventQueues[0];
773 }
774
775 /*
776  * this thread operates writing to the message database via libev.
777  */
778 void *db_event_thread(void *arg)
779 {
780         struct CitContext libev_msg_CC;
781
782         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
783 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
784         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
785
786         event_db = ev_loop_new (EVFLAG_AUTO);
787
788         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
789         ev_async_start(event_db, &DBAddJob);
790         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
791         ev_async_start(event_db, &DBExitEventLoop);
792
793         ev_run (event_db, 0);
794
795         syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
796
797 //// what to do here?   CtdlClearSystemContext();
798         ev_loop_destroy (event_db);
799
800         DeleteHash(&DBQueueEvents);
801         DBInboundEventQueue = NULL;
802         DeleteHash(&DBInboundEventQueues[0]);
803         DeleteHash(&DBInboundEventQueues[1]);
804
805         close(evdb_add_pipe[0]);
806         close(evdb_add_pipe[1]);
807 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
808
809         return(NULL);
810 }
811
812 void ShutDownEventQueues(void)
813 {
814         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
815
816         pthread_mutex_lock(&DBEventQueueMutex);
817         ev_async_send (event_db, &DBExitEventLoop);
818         pthread_mutex_unlock(&DBEventQueueMutex);
819
820         pthread_mutex_lock(&EventQueueMutex);
821         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
822         pthread_mutex_unlock(&EventQueueMutex);
823 }
824
825 CTDL_MODULE_INIT(event_client)
826 {
827         if (!threading)
828         {
829                 InitEventQueue();
830                 DBInitEventQueue();
831                 CtdlThreadCreate(client_event_thread);
832                 CtdlThreadCreate(db_event_thread);
833         }
834         return "event";
835 }