a079d123206d2f24f179d130a2973fb74822cc5b
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66
67 long EvIDSource = 1;
68 /*****************************************************************************
69  *                   libevent / curl integration                             *
70  *****************************************************************************/
71 #define MOPT(s, v)\
72         do {                                                            \
73                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
74                 if (sta) {                                              \
75                         syslog(LOG_ERR, "EVCURL: error setting option " \
76                                #s " on curl multi handle: %s\n",        \
77                                curl_easy_strerror(sta));                \
78                         exit (1);                                       \
79                 }                                                       \
80         } while (0)
81
82 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
83
84 #define EVCURL_syslog(LEVEL, FORMAT, ...) \
85         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID, __VA_ARGS__)
86
87 #define EVCURLM_syslog(LEVEL, FORMAT) \
88         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID)
89
90 #define CURL_syslog(LEVEL, FORMAT, ...) \
91         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
92
93 #define CURLM_syslog(LEVEL, FORMAT) \
94         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
95
96 typedef struct _evcurl_global_data {
97         int magic;
98         CURLM *mhnd;
99         ev_timer timeev;
100         int nrun;
101 } evcurl_global_data;
102
103 ev_async WakeupCurl;
104 evcurl_global_data global;
105
106 static void
107 gotstatus(int nnrun)
108 {
109         CURLMsg *msg;
110         int nmsg;
111
112         global.nrun = nnrun;
113
114         CURLM_syslog(LOG_DEBUG,
115                      "gotstatus(): about to call curl_multi_info_read\n");
116         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
117                 CURL_syslog(LOG_DEBUG,
118                             "got curl multi_info message msg=%d\n",
119                             msg->msg);
120
121                 if (CURLMSG_DONE == msg->msg) {
122                         CURL *chnd;
123                         char *chandle = NULL;
124                         CURLcode sta;
125                         CURLMcode msta;
126                         AsyncIO*IO;
127
128                         chandle = NULL;;
129                         chnd = msg->easy_handle;
130                         sta = curl_easy_getinfo(chnd,
131                                                 CURLINFO_PRIVATE,
132                                                 &chandle);
133                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
134                         if (sta) {
135                                 EVCURL_syslog(LOG_ERR,
136                                               "error asking curl for private"
137                                               " cookie of curl handle: %s\n",
138                                               curl_easy_strerror(sta));
139                                 continue;
140                         }
141                         IO = (AsyncIO *)chandle;
142
143                         IO->Now = ev_now(event_base);
144
145                         ev_io_stop(event_base, &IO->recv_event);
146                         ev_io_stop(event_base, &IO->send_event);
147
148                         sta = msg->data.result;
149                         if (sta) {
150                                 EVCURL_syslog(LOG_ERR,
151                                               "error description: %s\n",
152                                               IO->HttpReq.errdesc);
153                                 EVCURL_syslog(LOG_ERR,
154                                               "error performing request: %s\n",
155                                               curl_easy_strerror(sta));
156                         }
157                         sta = curl_easy_getinfo(chnd,
158                                                 CURLINFO_RESPONSE_CODE,
159                                                 &IO->HttpReq.httpcode);
160                         if (sta)
161                                 EVCURL_syslog(LOG_ERR,
162                                               "error asking curl for "
163                                               "response code from request: %s\n",
164                                               curl_easy_strerror(sta));
165                         EVCURL_syslog(LOG_ERR,
166                                       "http response code was %ld\n",
167                                       (long)IO->HttpReq.httpcode);
168
169
170                         curl_slist_free_all(IO->HttpReq.headers);
171                         msta = curl_multi_remove_handle(global.mhnd, chnd);
172                         if (msta)
173                                 EVCURL_syslog(LOG_ERR,
174                                               "warning problem detaching "
175                                               "completed handle from curl multi: "
176                                               "%s\n",
177                                               curl_multi_strerror(msta));
178
179                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
180
181                         IO->HttpReq.attached = 0;
182                         switch(IO->SendDone(IO))
183                         {
184                         case eDBQuery:
185                                 curl_easy_cleanup(IO->HttpReq.chnd);
186                                 IO->HttpReq.chnd = NULL;
187                                 break;
188                         case eSendDNSQuery:
189                         case eReadDNSReply:
190                         case eConnect:
191                         case eSendReply:
192                         case eSendMore:
193                         case eSendFile:
194                         case eReadMessage:
195                         case eReadMore:
196                         case eReadPayload:
197                         case eReadFile:
198                                 curl_easy_cleanup(IO->HttpReq.chnd);
199                                 IO->HttpReq.chnd = NULL;
200                                 break;
201                         case eTerminateConnection:
202                         case eAbort:
203                                 curl_easy_cleanup(IO->HttpReq.chnd);
204                                 IO->HttpReq.chnd = NULL;
205                                 FreeStrBuf(&IO->HttpReq.ReplyData);
206                                 FreeURL(&IO->ConnectMe);
207                                 RemoveContext(IO->CitContext);
208                                 IO->Terminate(IO);
209                         }
210                 }
211         }
212 }
213
214 static void
215 stepmulti(void *data, curl_socket_t fd, int which)
216 {
217         int running_handles = 0;
218         CURLMcode msta;
219
220         msta = curl_multi_socket_action(global.mhnd,
221                                         fd,
222                                         which,
223                                         &running_handles);
224
225         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
226         if (msta)
227                 CURL_syslog(LOG_ERR,
228                             "error in curl processing events"
229                             "on multi handle, fd %d: %s\n",
230                             (int)fd,
231                             curl_multi_strerror(msta));
232
233         if (global.nrun != running_handles)
234                 gotstatus(running_handles);
235 }
236
237 static void
238 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
239 {
240         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
241         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
242 }
243
244 static void
245 got_in(struct ev_loop *loop, ev_io *ioev, int events)
246 {
247         CURL_syslog(LOG_DEBUG,
248                     "EVCURL: waking up curl for io on fd %d\n",
249                     (int)ioev->fd);
250
251         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
252 }
253
254 static void
255 got_out(struct ev_loop *loop, ev_io *ioev, int events)
256 {
257         CURL_syslog(LOG_DEBUG,
258                     "waking up curl for io on fd %d\n",
259                     (int)ioev->fd);
260
261         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
262 }
263
264 static size_t
265 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
266         AsyncIO *IO = (AsyncIO*) cglobal;
267
268         if (IO->HttpReq.ReplyData == NULL)
269         {
270                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
271         }
272         IO->Now = ev_now(event_base);
273         return CurlFillStrBuf_callback(data,
274                                        size,
275                                        nmemb,
276                                        IO->HttpReq.ReplyData);
277 }
278
279 static int
280 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
281         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
282         evcurl_global_data *global = cglobal;
283         ev_timer_stop(EV_DEFAULT, &global->timeev);
284         if (tblock_ms < 0 || 14000 < tblock_ms)
285                 tblock_ms = 14000;
286         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
287         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
288         curl_multi_perform(global, &global->nrun);
289         return 0;
290 }
291
292 static int
293 gotwatchsock(CURL *easy,
294              curl_socket_t fd,
295              int action,
296              void *cglobal,
297              void *vIO)
298 {
299         evcurl_global_data *global = cglobal;
300         CURLM *mhnd = global->mhnd;
301         char *f;
302         AsyncIO *IO = (AsyncIO*) vIO;
303         CURLcode sta;
304         const char *Action;
305
306         if (IO == NULL) {
307                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
308                 if (sta) {
309                         EVCURL_syslog(LOG_ERR,
310                                       "EVCURL: error asking curl for private "
311                                       "cookie of curl handle: %s\n",
312                                       curl_easy_strerror(sta));
313                         return -1;
314                 }
315                 IO = (AsyncIO *) f;
316                 EVCURL_syslog(LOG_DEBUG,
317                               "EVCURL: got socket for URL: %s\n",
318                               IO->ConnectMe->PlainUrl);
319
320                 if (IO->SendBuf.fd != 0)
321                 {
322                         ev_io_stop(event_base, &IO->recv_event);
323                         ev_io_stop(event_base, &IO->send_event);
324                 }
325                 IO->SendBuf.fd = fd;
326                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
327                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
328                 curl_multi_assign(mhnd, fd, IO);
329         }
330
331         IO->Now = ev_now(event_base);
332
333         Action = "";
334         switch (action)
335         {
336         case CURL_POLL_NONE:
337                 Action = "CURL_POLL_NONE";
338                 break;
339         case CURL_POLL_REMOVE:
340                 Action = "CURL_POLL_REMOVE";
341                 break;
342         case CURL_POLL_IN:
343                 Action = "CURL_POLL_IN";
344                 break;
345         case CURL_POLL_OUT:
346                 Action = "CURL_POLL_OUT";
347                 break;
348         case CURL_POLL_INOUT:
349                 Action = "CURL_POLL_INOUT";
350                 break;
351         }
352
353
354         EVCURL_syslog(LOG_DEBUG,
355                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
356                       (int)fd, Action, action);
357
358         switch (action)
359         {
360         case CURL_POLL_NONE:
361                 EVCURLM_syslog(LOG_ERR,
362                                "called first time "
363                                "to register this sockwatcker\n");
364                 break;
365         case CURL_POLL_REMOVE:
366                 EVCURLM_syslog(LOG_ERR,
367                                "called last time to unregister "
368                                "this sockwatcher\n");
369                 ev_io_stop(event_base, &IO->recv_event);
370                 ev_io_stop(event_base, &IO->send_event);
371                 break;
372         case CURL_POLL_IN:
373                 ev_io_start(event_base, &IO->recv_event);
374                 ev_io_stop(event_base, &IO->send_event);
375                 break;
376         case CURL_POLL_OUT:
377                 ev_io_start(event_base, &IO->send_event);
378                 ev_io_stop(event_base, &IO->recv_event);
379                 break;
380         case CURL_POLL_INOUT:
381                 ev_io_start(event_base, &IO->send_event);
382                 ev_io_start(event_base, &IO->recv_event);
383                 break;
384         }
385         return 0;
386 }
387
388 void curl_init_connectionpool(void)
389 {
390         CURLM *mhnd ;
391
392         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
393         global.timeev.data = (void *)&global;
394         global.nrun = -1;
395         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
396
397         if (sta)
398         {
399                 CURL_syslog(LOG_ERR,
400                             "error initializing curl library: %s\n",
401                             curl_easy_strerror(sta));
402
403                 exit(1);
404         }
405         mhnd = global.mhnd = curl_multi_init();
406         if (!mhnd)
407         {
408                 CURLM_syslog(LOG_ERR,
409                              "error initializing curl multi handle\n");
410                 exit(3);
411         }
412
413         MOPT(SOCKETFUNCTION, &gotwatchsock);
414         MOPT(SOCKETDATA, (void *)&global);
415         MOPT(TIMERFUNCTION, &gotwatchtime);
416         MOPT(TIMERDATA, (void *)&global);
417
418         return;
419 }
420
421 int evcurl_init(AsyncIO *IO)
422 {
423         CURLcode sta;
424         CURL *chnd;
425
426         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
427         IO->HttpReq.attached = 0;
428         chnd = IO->HttpReq.chnd = curl_easy_init();
429         if (!chnd)
430         {
431                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
432                 return 0;
433         }
434
435 #if DEBUG
436         OPT(VERBOSE, (long)1);
437 #endif
438         OPT(NOPROGRESS, 1L);
439
440         OPT(NOSIGNAL, 1L);
441         OPT(FAILONERROR, (long)1);
442         OPT(ENCODING, "");
443         OPT(FOLLOWLOCATION, (long)0);
444         OPT(MAXREDIRS, (long)0);
445         OPT(USERAGENT, CITADEL);
446
447         OPT(TIMEOUT, (long)1800);
448         OPT(LOW_SPEED_LIMIT, (long)64);
449         OPT(LOW_SPEED_TIME, (long)600);
450         OPT(CONNECTTIMEOUT, (long)600);
451         OPT(PRIVATE, (void *)IO);
452
453         OPT(FORBID_REUSE, 1);
454         OPT(WRITEFUNCTION, &gotdata);
455         OPT(WRITEDATA, (void *)IO);
456         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
457
458         if ((!IsEmptyStr(config.c_ip_addr))
459                 && (strcmp(config.c_ip_addr, "*"))
460                 && (strcmp(config.c_ip_addr, "::"))
461                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
462                 )
463         {
464                 OPT(INTERFACE, config.c_ip_addr);
465         }
466
467 #ifdef CURLOPT_HTTP_CONTENT_DECODING
468         OPT(HTTP_CONTENT_DECODING, 1);
469         OPT(ENCODING, "");
470 #endif
471
472         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
473                                                 "Connection: close");
474
475         return 1;
476 }
477
478
479 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
480                                            ev_cleanup *watcher,
481                                            int revents)
482 {
483         CURLMcode msta;
484         AsyncIO *IO = watcher->data;
485         IO->Now = ev_now(event_base);
486         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
487
488         curl_slist_free_all(IO->HttpReq.headers);
489         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
490         if (msta)
491         {
492                 EVCURL_syslog(LOG_ERR,
493                               "EVCURL: warning problem detaching completed handle "
494                               "from curl multi: %s\n",
495                               curl_multi_strerror(msta));
496         }
497
498         curl_easy_cleanup(IO->HttpReq.chnd);
499         IO->HttpReq.chnd = NULL;
500         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
501         ev_io_stop(event_base, &IO->recv_event);
502         ev_io_stop(event_base, &IO->send_event);
503         assert(IO->ShutdownAbort);
504         IO->ShutdownAbort(IO);
505 }
506 eNextState
507 evcurl_handle_start(AsyncIO *IO)
508 {
509         CURLMcode msta;
510         CURLcode sta;
511         CURL *chnd;
512
513         chnd = IO->HttpReq.chnd;
514         EVCURL_syslog(LOG_DEBUG,
515                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
516         OPT(URL, IO->ConnectMe->PlainUrl);
517         if (StrLength(IO->ConnectMe->CurlCreds))
518         {
519                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
520                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
521         }
522         if (StrLength(IO->HttpReq.PostData) > 0)
523         {
524                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
525                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
526
527         }
528         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
529                  (IO->HttpReq.PlainPostData != NULL))
530         {
531                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
532                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
533         }
534         OPT(HTTPHEADER, IO->HttpReq.headers);
535
536         IO->NextState = eConnect;
537         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
538         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
539         if (msta)
540         {
541                 EVCURL_syslog(LOG_ERR,
542                           "EVCURL: error attaching to curl multi handle: %s\n",
543                           curl_multi_strerror(msta));
544         }
545
546         IO->HttpReq.attached = 1;
547         ev_async_send (event_base, &WakeupCurl);
548         ev_cleanup_init(&IO->abort_by_shutdown,
549                         IOcurl_abort_shutdown_callback);
550
551         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
552         return eReadMessage;
553 }
554
555 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
556 {
557         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
558
559         curl_multi_perform(&global, CURL_POLL_NONE);
560 }
561
562 static void evcurl_shutdown (void)
563 {
564         curl_global_cleanup();
565         curl_multi_cleanup(global.mhnd);
566         CURLM_syslog(LOG_DEBUG, "exiting\n");
567 }
568 /*****************************************************************************
569  *                       libevent integration                                *
570  *****************************************************************************/
571 /*
572  * client event queue plus its methods.
573  * this currently is the main loop (which may change in some future?)
574  */
575 int evbase_count = 0;
576 int event_add_pipe[2] = {-1, -1};
577 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
578 HashList *QueueEvents = NULL;
579 HashList *InboundEventQueue = NULL;
580 HashList *InboundEventQueues[2] = { NULL, NULL };
581
582 ev_async AddJob;
583 ev_async ExitEventLoop;
584
585 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
586 {
587         ev_tstamp Now;
588         HashList *q;
589         void *v;
590         HashPos*It;
591         long len;
592         const char *Key;
593
594         /* get the control command... */
595         pthread_mutex_lock(&EventQueueMutex);
596
597         if (InboundEventQueues[0] == InboundEventQueue) {
598                 InboundEventQueue = InboundEventQueues[1];
599                 q = InboundEventQueues[0];
600         }
601         else {
602                 InboundEventQueue = InboundEventQueues[0];
603                 q = InboundEventQueues[1];
604         }
605         pthread_mutex_unlock(&EventQueueMutex);
606         Now = ev_now (event_base);
607         It = GetNewHashPos(q, 0);
608         while (GetNextHashPos(q, It, &len, &Key, &v))
609         {
610                 IOAddHandler *h = v;
611                 if (h->IO->ID == 0) {
612                         h->IO->ID = EvIDSource++;
613                 }
614                 if (h->IO->StartIO == 0.0)
615                         h->IO->StartIO = Now;
616                 h->IO->Now = Now;
617                 h->EvAttch(h->IO);
618         }
619         DeleteHashPos(&It);
620         DeleteHashContent(&q);
621         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
622 }
623
624
625 static void EventExitCallback(EV_P_ ev_async *w, int revents)
626 {
627         ev_break(event_base, EVBREAK_ALL);
628
629         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
630 }
631
632
633
634 void InitEventQueue(void)
635 {
636         struct rlimit LimitSet;
637
638         pthread_mutex_init(&EventQueueMutex, NULL);
639
640         if (pipe(event_add_pipe) != 0) {
641                 syslog(LOG_EMERG,
642                        "Unable to create pipe for libev queueing: %s\n",
643                        strerror(errno));
644                 abort();
645         }
646         LimitSet.rlim_cur = 1;
647         LimitSet.rlim_max = 1;
648         setrlimit(event_add_pipe[1], &LimitSet);
649
650         QueueEvents = NewHash(1, Flathash);
651         InboundEventQueues[0] = NewHash(1, Flathash);
652         InboundEventQueues[1] = NewHash(1, Flathash);
653         InboundEventQueue = InboundEventQueues[0];
654 }
655 extern void CtdlDestroyEVCleanupHooks(void);
656
657 extern int EVQShutDown;
658 /*
659  * this thread operates the select() etc. via libev.
660  */
661 void *client_event_thread(void *arg) 
662 {
663         struct CitContext libev_client_CC;
664
665         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
666 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
667         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
668
669         event_base = ev_default_loop (EVFLAG_AUTO);
670         ev_async_init(&AddJob, QueueEventAddCallback);
671         ev_async_start(event_base, &AddJob);
672         ev_async_init(&ExitEventLoop, EventExitCallback);
673         ev_async_start(event_base, &ExitEventLoop);
674         ev_async_init(&WakeupCurl, WakeupCurlCallback);
675         ev_async_start(event_base, &WakeupCurl);
676
677         curl_init_connectionpool();
678
679         ev_run (event_base, 0);
680
681         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
682
683 ///what todo here?      CtdlClearSystemContext();
684         ev_loop_destroy (EV_DEFAULT_UC);
685         DeleteHash(&QueueEvents);
686         InboundEventQueue = NULL;
687         DeleteHash(&InboundEventQueues[0]);
688         DeleteHash(&InboundEventQueues[1]);
689 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
690         evcurl_shutdown();
691         close(event_add_pipe[0]);
692         close(event_add_pipe[1]);
693
694         CtdlDestroyEVCleanupHooks();
695
696         EVQShutDown = 1;        
697         return(NULL);
698 }
699
700 /*----------------------------------------------------------------------------*/
701 /*
702  * DB-Queue; does async bdb operations.
703  * has its own set of handlers.
704  */
705 ev_loop *event_db;
706 int evdb_count = 0;
707 int evdb_add_pipe[2] = {-1, -1};
708 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
709 HashList *DBQueueEvents = NULL;
710 HashList *DBInboundEventQueue = NULL;
711 HashList *DBInboundEventQueues[2] = { NULL, NULL };
712
713 ev_async DBAddJob;
714 ev_async DBExitEventLoop;
715
716 extern void ShutDownDBCLient(AsyncIO *IO);
717
718 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
719 {
720         ev_tstamp Now;
721         HashList *q;
722         void *v;
723         HashPos *It;
724         long len;
725         const char *Key;
726
727         /* get the control command... */
728         pthread_mutex_lock(&DBEventQueueMutex);
729
730         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
731                 DBInboundEventQueue = DBInboundEventQueues[1];
732                 q = DBInboundEventQueues[0];
733         }
734         else {
735                 DBInboundEventQueue = DBInboundEventQueues[0];
736                 q = DBInboundEventQueues[1];
737         }
738         pthread_mutex_unlock(&DBEventQueueMutex);
739
740         Now = ev_now (event_db);
741         It = GetNewHashPos(q, 0);
742         while (GetNextHashPos(q, It, &len, &Key, &v))
743         {
744                 IOAddHandler *h = v;
745                 eNextState rc;
746                 if (h->IO->ID == 0)
747                         h->IO->ID = EvIDSource++;
748                 if (h->IO->StartDB == 0.0)
749                         h->IO->StartDB = Now;
750                 h->IO->Now = Now;
751                 rc = h->EvAttch(h->IO);
752                 switch (rc)
753                 {
754                 case eAbort:
755                         ShutDownDBCLient(h->IO);
756                 default:
757                         break;
758                 }
759         }
760         DeleteHashPos(&It);
761         DeleteHashContent(&q);
762         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
763 }
764
765
766 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
767 {
768         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
769         ev_break(event_db, EVBREAK_ALL);
770 }
771
772
773
774 void DBInitEventQueue(void)
775 {
776         struct rlimit LimitSet;
777
778         pthread_mutex_init(&DBEventQueueMutex, NULL);
779
780         if (pipe(evdb_add_pipe) != 0) {
781                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
782                 abort();
783         }
784         LimitSet.rlim_cur = 1;
785         LimitSet.rlim_max = 1;
786         setrlimit(evdb_add_pipe[1], &LimitSet);
787
788         DBQueueEvents = NewHash(1, Flathash);
789         DBInboundEventQueues[0] = NewHash(1, Flathash);
790         DBInboundEventQueues[1] = NewHash(1, Flathash);
791         DBInboundEventQueue = DBInboundEventQueues[0];
792 }
793
794 /*
795  * this thread operates writing to the message database via libev.
796  */
797 void *db_event_thread(void *arg)
798 {
799         struct CitContext libev_msg_CC;
800
801         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
802 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
803         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
804
805         event_db = ev_loop_new (EVFLAG_AUTO);
806
807         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
808         ev_async_start(event_db, &DBAddJob);
809         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
810         ev_async_start(event_db, &DBExitEventLoop);
811
812         ev_run (event_db, 0);
813
814         syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
815
816 //// what to do here?   CtdlClearSystemContext();
817         ev_loop_destroy (event_db);
818
819         DeleteHash(&DBQueueEvents);
820         DBInboundEventQueue = NULL;
821         DeleteHash(&DBInboundEventQueues[0]);
822         DeleteHash(&DBInboundEventQueues[1]);
823
824         close(evdb_add_pipe[0]);
825         close(evdb_add_pipe[1]);
826 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
827
828         return(NULL);
829 }
830
831 void ShutDownEventQueues(void)
832 {
833         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
834
835         pthread_mutex_lock(&DBEventQueueMutex);
836         ev_async_send (event_db, &DBExitEventLoop);
837         pthread_mutex_unlock(&DBEventQueueMutex);
838
839         pthread_mutex_lock(&EventQueueMutex);
840         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
841         pthread_mutex_unlock(&EventQueueMutex);
842 }
843
844 void DebugEventloopEnable(void)
845 {
846         DebugEventLoop = 1;
847 }
848 void DebugEventloopBacktraceEnable(void)
849 {
850         DebugEventLoopBacktrace = 1;
851 }
852
853 void DebugCurlEnable(void)
854 {
855         DebugCurl = 1;
856 }
857
858 CTDL_MODULE_INIT(event_client)
859 {
860         if (!threading)
861         {
862                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable);
863                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable);
864                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable);
865                 InitEventQueue();
866                 DBInitEventQueue();
867                 CtdlThreadCreate(client_event_thread);
868                 CtdlThreadCreate(db_event_thread);
869         }
870         return "event";
871 }