b0d9a3c3257642a1a3cb11e553708f40f7931fc8
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66
67 long EvIDSource = 1;
68 /*****************************************************************************
69  *                   libevent / curl integration                             *
70  *****************************************************************************/
71 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
72
73 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
74         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
75                               IO->ID, CCID, __VA_ARGS__)
76
77 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
78         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
79                               IO->ID, CCID)
80
81 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
82         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
83
84 #define CURLM_syslog(LEVEL, FORMAT)                     \
85         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
86
87 #define MOPT(s, v)                                                      \
88         do {                                                            \
89                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
90                 if (sta) {                                              \
91                         EVQ_syslog(LOG_ERR, "error setting option "     \
92                                #s " on curl multi handle: %s\n",        \
93                                curl_easy_strerror(sta));                \
94                         exit (1);                                       \
95                 }                                                       \
96         } while (0)
97
98
99 typedef struct _evcurl_global_data {
100         int magic;
101         CURLM *mhnd;
102         ev_timer timeev;
103         int nrun;
104 } evcurl_global_data;
105
106 ev_async WakeupCurl;
107 evcurl_global_data global;
108
109 static void
110 gotstatus(int nnrun)
111 {
112         CURLMsg *msg;
113         int nmsg;
114
115         global.nrun = nnrun;
116
117         CURLM_syslog(LOG_DEBUG,
118                      "gotstatus(): about to call curl_multi_info_read\n");
119         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
120                 CURL_syslog(LOG_DEBUG,
121                             "got curl multi_info message msg=%d\n",
122                             msg->msg);
123
124                 if (CURLMSG_DONE == msg->msg) {
125                         CURL *chnd;
126                         char *chandle = NULL;
127                         CURLcode sta;
128                         CURLMcode msta;
129                         AsyncIO*IO;
130
131                         chandle = NULL;;
132                         chnd = msg->easy_handle;
133                         sta = curl_easy_getinfo(chnd,
134                                                 CURLINFO_PRIVATE,
135                                                 &chandle);
136                         if (sta) {
137                                 EVCURL_syslog(LOG_ERR,
138                                               "error asking curl for private"
139                                               " cookie of curl handle: %s\n",
140                                               curl_easy_strerror(sta));
141                                 continue;
142                         }
143                         IO = (AsyncIO *)chandle;
144
145                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
146
147                         IO->Now = ev_now(event_base);
148
149                         ev_io_stop(event_base, &IO->recv_event);
150                         ev_io_stop(event_base, &IO->send_event);
151
152                         sta = msg->data.result;
153                         if (sta) {
154                                 EVCURL_syslog(LOG_ERR,
155                                               "error description: %s\n",
156                                               IO->HttpReq.errdesc);
157                                 EVCURL_syslog(LOG_ERR,
158                                               "error performing request: %s\n",
159                                               curl_easy_strerror(sta));
160                         }
161                         sta = curl_easy_getinfo(chnd,
162                                                 CURLINFO_RESPONSE_CODE,
163                                                 &IO->HttpReq.httpcode);
164                         if (sta)
165                                 EVCURL_syslog(LOG_ERR,
166                                               "error asking curl for "
167                                               "response code from request: %s\n",
168                                               curl_easy_strerror(sta));
169                         EVCURL_syslog(LOG_ERR,
170                                       "http response code was %ld\n",
171                                       (long)IO->HttpReq.httpcode);
172
173
174                         curl_slist_free_all(IO->HttpReq.headers);
175                         msta = curl_multi_remove_handle(global.mhnd, chnd);
176                         if (msta)
177                                 EVCURL_syslog(LOG_ERR,
178                                               "warning problem detaching "
179                                               "completed handle from curl multi: "
180                                               "%s\n",
181                                               curl_multi_strerror(msta));
182
183                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
184
185                         IO->HttpReq.attached = 0;
186                         switch(IO->SendDone(IO))
187                         {
188                         case eDBQuery:
189                                 curl_easy_cleanup(IO->HttpReq.chnd);
190                                 IO->HttpReq.chnd = NULL;
191                                 break;
192                         case eSendDNSQuery:
193                         case eReadDNSReply:
194                         case eConnect:
195                         case eSendReply:
196                         case eSendMore:
197                         case eSendFile:
198                         case eReadMessage:
199                         case eReadMore:
200                         case eReadPayload:
201                         case eReadFile:
202                                 curl_easy_cleanup(IO->HttpReq.chnd);
203                                 IO->HttpReq.chnd = NULL;
204                                 break;
205                         case eTerminateConnection:
206                         case eAbort:
207                                 curl_easy_cleanup(IO->HttpReq.chnd);
208                                 IO->HttpReq.chnd = NULL;
209                                 FreeStrBuf(&IO->HttpReq.ReplyData);
210                                 FreeURL(&IO->ConnectMe);
211                                 RemoveContext(IO->CitContext);
212                                 IO->Terminate(IO);
213                         }
214                 }
215         }
216 }
217
218 static void
219 stepmulti(void *data, curl_socket_t fd, int which)
220 {
221         int running_handles = 0;
222         CURLMcode msta;
223
224         msta = curl_multi_socket_action(global.mhnd,
225                                         fd,
226                                         which,
227                                         &running_handles);
228
229         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
230         if (msta)
231                 CURL_syslog(LOG_ERR,
232                             "error in curl processing events"
233                             "on multi handle, fd %d: %s\n",
234                             (int)fd,
235                             curl_multi_strerror(msta));
236
237         if (global.nrun != running_handles)
238                 gotstatus(running_handles);
239 }
240
241 static void
242 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
243 {
244         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
245         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
246 }
247
248 static void
249 got_in(struct ev_loop *loop, ev_io *ioev, int events)
250 {
251         CURL_syslog(LOG_DEBUG,
252                     "EVCURL: waking up curl for io on fd %d\n",
253                     (int)ioev->fd);
254
255         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
256 }
257
258 static void
259 got_out(struct ev_loop *loop, ev_io *ioev, int events)
260 {
261         CURL_syslog(LOG_DEBUG,
262                     "waking up curl for io on fd %d\n",
263                     (int)ioev->fd);
264
265         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
266 }
267
268 static size_t
269 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
270         AsyncIO *IO = (AsyncIO*) cglobal;
271
272         if (IO->HttpReq.ReplyData == NULL)
273         {
274                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
275         }
276         IO->Now = ev_now(event_base);
277         return CurlFillStrBuf_callback(data,
278                                        size,
279                                        nmemb,
280                                        IO->HttpReq.ReplyData);
281 }
282
283 static int
284 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
285         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
286         evcurl_global_data *global = cglobal;
287         ev_timer_stop(EV_DEFAULT, &global->timeev);
288         if (tblock_ms < 0 || 14000 < tblock_ms)
289                 tblock_ms = 14000;
290         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
291         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
292         curl_multi_perform(global, &global->nrun);
293         return 0;
294 }
295
296 static int
297 gotwatchsock(CURL *easy,
298              curl_socket_t fd,
299              int action,
300              void *cglobal,
301              void *vIO)
302 {
303         evcurl_global_data *global = cglobal;
304         CURLM *mhnd = global->mhnd;
305         char *f;
306         AsyncIO *IO = (AsyncIO*) vIO;
307         CURLcode sta;
308         const char *Action;
309
310         if (IO == NULL) {
311                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
312                 if (sta) {
313                         EVCURL_syslog(LOG_ERR,
314                                       "EVCURL: error asking curl for private "
315                                       "cookie of curl handle: %s\n",
316                                       curl_easy_strerror(sta));
317                         return -1;
318                 }
319                 IO = (AsyncIO *) f;
320                 EVCURL_syslog(LOG_DEBUG,
321                               "EVCURL: got socket for URL: %s\n",
322                               IO->ConnectMe->PlainUrl);
323
324                 if (IO->SendBuf.fd != 0)
325                 {
326                         ev_io_stop(event_base, &IO->recv_event);
327                         ev_io_stop(event_base, &IO->send_event);
328                 }
329                 IO->SendBuf.fd = fd;
330                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
331                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
332                 curl_multi_assign(mhnd, fd, IO);
333         }
334
335         IO->Now = ev_now(event_base);
336
337         Action = "";
338         switch (action)
339         {
340         case CURL_POLL_NONE:
341                 Action = "CURL_POLL_NONE";
342                 break;
343         case CURL_POLL_REMOVE:
344                 Action = "CURL_POLL_REMOVE";
345                 break;
346         case CURL_POLL_IN:
347                 Action = "CURL_POLL_IN";
348                 break;
349         case CURL_POLL_OUT:
350                 Action = "CURL_POLL_OUT";
351                 break;
352         case CURL_POLL_INOUT:
353                 Action = "CURL_POLL_INOUT";
354                 break;
355         }
356
357
358         EVCURL_syslog(LOG_DEBUG,
359                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
360                       (int)fd, Action, action);
361
362         switch (action)
363         {
364         case CURL_POLL_NONE:
365                 EVCURLM_syslog(LOG_DEBUG,
366                                "called first time "
367                                "to register this sockwatcker\n");
368                 break;
369         case CURL_POLL_REMOVE:
370                 EVCURLM_syslog(LOG_DEBUG,
371                                "called last time to unregister "
372                                "this sockwatcher\n");
373                 ev_io_stop(event_base, &IO->recv_event);
374                 ev_io_stop(event_base, &IO->send_event);
375                 break;
376         case CURL_POLL_IN:
377                 ev_io_start(event_base, &IO->recv_event);
378                 ev_io_stop(event_base, &IO->send_event);
379                 break;
380         case CURL_POLL_OUT:
381                 ev_io_start(event_base, &IO->send_event);
382                 ev_io_stop(event_base, &IO->recv_event);
383                 break;
384         case CURL_POLL_INOUT:
385                 ev_io_start(event_base, &IO->send_event);
386                 ev_io_start(event_base, &IO->recv_event);
387                 break;
388         }
389         return 0;
390 }
391
392 void curl_init_connectionpool(void)
393 {
394         CURLM *mhnd ;
395
396         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
397         global.timeev.data = (void *)&global;
398         global.nrun = -1;
399         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
400
401         if (sta)
402         {
403                 CURL_syslog(LOG_ERR,
404                             "error initializing curl library: %s\n",
405                             curl_easy_strerror(sta));
406
407                 exit(1);
408         }
409         mhnd = global.mhnd = curl_multi_init();
410         if (!mhnd)
411         {
412                 CURLM_syslog(LOG_ERR,
413                              "error initializing curl multi handle\n");
414                 exit(3);
415         }
416
417         MOPT(SOCKETFUNCTION, &gotwatchsock);
418         MOPT(SOCKETDATA, (void *)&global);
419         MOPT(TIMERFUNCTION, &gotwatchtime);
420         MOPT(TIMERDATA, (void *)&global);
421
422         return;
423 }
424
425 int evcurl_init(AsyncIO *IO)
426 {
427         CURLcode sta;
428         CURL *chnd;
429
430         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
431         IO->HttpReq.attached = 0;
432         chnd = IO->HttpReq.chnd = curl_easy_init();
433         if (!chnd)
434         {
435                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
436                 return 0;
437         }
438
439 #if DEBUG
440         OPT(VERBOSE, (long)1);
441 #endif
442         OPT(NOPROGRESS, 1L);
443
444         OPT(NOSIGNAL, 1L);
445         OPT(FAILONERROR, (long)1);
446         OPT(ENCODING, "");
447         OPT(FOLLOWLOCATION, (long)0);
448         OPT(MAXREDIRS, (long)0);
449         OPT(USERAGENT, CITADEL);
450
451         OPT(TIMEOUT, (long)1800);
452         OPT(LOW_SPEED_LIMIT, (long)64);
453         OPT(LOW_SPEED_TIME, (long)600);
454         OPT(CONNECTTIMEOUT, (long)600);
455         OPT(PRIVATE, (void *)IO);
456
457         OPT(FORBID_REUSE, 1);
458         OPT(WRITEFUNCTION, &gotdata);
459         OPT(WRITEDATA, (void *)IO);
460         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
461
462         if ((!IsEmptyStr(config.c_ip_addr))
463                 && (strcmp(config.c_ip_addr, "*"))
464                 && (strcmp(config.c_ip_addr, "::"))
465                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
466                 )
467         {
468                 OPT(INTERFACE, config.c_ip_addr);
469         }
470
471 #ifdef CURLOPT_HTTP_CONTENT_DECODING
472         OPT(HTTP_CONTENT_DECODING, 1);
473         OPT(ENCODING, "");
474 #endif
475
476         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
477                                                 "Connection: close");
478
479         return 1;
480 }
481
482
483 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
484                                            ev_cleanup *watcher,
485                                            int revents)
486 {
487         CURLMcode msta;
488         AsyncIO *IO = watcher->data;
489         IO->Now = ev_now(event_base);
490         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
491
492         curl_slist_free_all(IO->HttpReq.headers);
493         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
494         if (msta)
495         {
496                 EVCURL_syslog(LOG_ERR,
497                               "EVCURL: warning problem detaching completed handle "
498                               "from curl multi: %s\n",
499                               curl_multi_strerror(msta));
500         }
501
502         curl_easy_cleanup(IO->HttpReq.chnd);
503         IO->HttpReq.chnd = NULL;
504         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
505         ev_io_stop(event_base, &IO->recv_event);
506         ev_io_stop(event_base, &IO->send_event);
507         assert(IO->ShutdownAbort);
508         IO->ShutdownAbort(IO);
509 }
510 eNextState
511 evcurl_handle_start(AsyncIO *IO)
512 {
513         CURLMcode msta;
514         CURLcode sta;
515         CURL *chnd;
516
517         chnd = IO->HttpReq.chnd;
518         EVCURL_syslog(LOG_DEBUG,
519                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
520         OPT(URL, IO->ConnectMe->PlainUrl);
521         if (StrLength(IO->ConnectMe->CurlCreds))
522         {
523                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
524                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
525         }
526         if (StrLength(IO->HttpReq.PostData) > 0)
527         {
528                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
529                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
530
531         }
532         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
533                  (IO->HttpReq.PlainPostData != NULL))
534         {
535                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
536                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
537         }
538         OPT(HTTPHEADER, IO->HttpReq.headers);
539
540         IO->NextState = eConnect;
541         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
542         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
543         if (msta)
544         {
545                 EVCURL_syslog(LOG_ERR,
546                           "EVCURL: error attaching to curl multi handle: %s\n",
547                           curl_multi_strerror(msta));
548         }
549
550         IO->HttpReq.attached = 1;
551         ev_async_send (event_base, &WakeupCurl);
552         ev_cleanup_init(&IO->abort_by_shutdown,
553                         IOcurl_abort_shutdown_callback);
554
555         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
556         return eReadMessage;
557 }
558
559 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
560 {
561         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
562
563         curl_multi_perform(&global, CURL_POLL_NONE);
564 }
565
566 static void evcurl_shutdown (void)
567 {
568         curl_global_cleanup();
569         curl_multi_cleanup(global.mhnd);
570         CURLM_syslog(LOG_DEBUG, "exiting\n");
571 }
572 /*****************************************************************************
573  *                       libevent integration                                *
574  *****************************************************************************/
575 /*
576  * client event queue plus its methods.
577  * this currently is the main loop (which may change in some future?)
578  */
579 int evbase_count = 0;
580 int event_add_pipe[2] = {-1, -1};
581 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
582 HashList *QueueEvents = NULL;
583 HashList *InboundEventQueue = NULL;
584 HashList *InboundEventQueues[2] = { NULL, NULL };
585
586 ev_async AddJob;
587 ev_async ExitEventLoop;
588
589 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
590 {
591         ev_tstamp Now;
592         HashList *q;
593         void *v;
594         HashPos*It;
595         long len;
596         const char *Key;
597
598         /* get the control command... */
599         pthread_mutex_lock(&EventQueueMutex);
600
601         if (InboundEventQueues[0] == InboundEventQueue) {
602                 InboundEventQueue = InboundEventQueues[1];
603                 q = InboundEventQueues[0];
604         }
605         else {
606                 InboundEventQueue = InboundEventQueues[0];
607                 q = InboundEventQueues[1];
608         }
609         pthread_mutex_unlock(&EventQueueMutex);
610         Now = ev_now (event_base);
611         It = GetNewHashPos(q, 0);
612         while (GetNextHashPos(q, It, &len, &Key, &v))
613         {
614                 IOAddHandler *h = v;
615                 if (h->IO->ID == 0) {
616                         h->IO->ID = EvIDSource++;
617                 }
618                 if (h->IO->StartIO == 0.0)
619                         h->IO->StartIO = Now;
620                 h->IO->Now = Now;
621                 h->EvAttch(h->IO);
622         }
623         DeleteHashPos(&It);
624         DeleteHashContent(&q);
625         EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n");
626 }
627
628
629 static void EventExitCallback(EV_P_ ev_async *w, int revents)
630 {
631         ev_break(event_base, EVBREAK_ALL);
632
633         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
634 }
635
636
637
638 void InitEventQueue(void)
639 {
640         struct rlimit LimitSet;
641
642         pthread_mutex_init(&EventQueueMutex, NULL);
643
644         if (pipe(event_add_pipe) != 0) {
645                 syslog(LOG_EMERG,
646                        "Unable to create pipe for libev queueing: %s\n",
647                        strerror(errno));
648                 abort();
649         }
650         LimitSet.rlim_cur = 1;
651         LimitSet.rlim_max = 1;
652         setrlimit(event_add_pipe[1], &LimitSet);
653
654         QueueEvents = NewHash(1, Flathash);
655         InboundEventQueues[0] = NewHash(1, Flathash);
656         InboundEventQueues[1] = NewHash(1, Flathash);
657         InboundEventQueue = InboundEventQueues[0];
658 }
659 extern void CtdlDestroyEVCleanupHooks(void);
660
661 extern int EVQShutDown;
662 /*
663  * this thread operates the select() etc. via libev.
664  */
665 void *client_event_thread(void *arg) 
666 {
667         struct CitContext libev_client_CC;
668
669         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
670 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
671         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
672
673         event_base = ev_default_loop (EVFLAG_AUTO);
674         ev_async_init(&AddJob, QueueEventAddCallback);
675         ev_async_start(event_base, &AddJob);
676         ev_async_init(&ExitEventLoop, EventExitCallback);
677         ev_async_start(event_base, &ExitEventLoop);
678         ev_async_init(&WakeupCurl, WakeupCurlCallback);
679         ev_async_start(event_base, &WakeupCurl);
680
681         curl_init_connectionpool();
682
683         ev_run (event_base, 0);
684
685         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
686
687 ///what todo here?      CtdlClearSystemContext();
688         ev_loop_destroy (EV_DEFAULT_UC);
689         DeleteHash(&QueueEvents);
690         InboundEventQueue = NULL;
691         DeleteHash(&InboundEventQueues[0]);
692         DeleteHash(&InboundEventQueues[1]);
693 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
694         evcurl_shutdown();
695         close(event_add_pipe[0]);
696         close(event_add_pipe[1]);
697
698         CtdlDestroyEVCleanupHooks();
699
700         EVQShutDown = 1;        
701         return(NULL);
702 }
703
704 /*----------------------------------------------------------------------------*/
705 /*
706  * DB-Queue; does async bdb operations.
707  * has its own set of handlers.
708  */
709 ev_loop *event_db;
710 int evdb_count = 0;
711 int evdb_add_pipe[2] = {-1, -1};
712 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
713 HashList *DBQueueEvents = NULL;
714 HashList *DBInboundEventQueue = NULL;
715 HashList *DBInboundEventQueues[2] = { NULL, NULL };
716
717 ev_async DBAddJob;
718 ev_async DBExitEventLoop;
719
720 extern void ShutDownDBCLient(AsyncIO *IO);
721
722 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
723 {
724         ev_tstamp Now;
725         HashList *q;
726         void *v;
727         HashPos *It;
728         long len;
729         const char *Key;
730
731         /* get the control command... */
732         pthread_mutex_lock(&DBEventQueueMutex);
733
734         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
735                 DBInboundEventQueue = DBInboundEventQueues[1];
736                 q = DBInboundEventQueues[0];
737         }
738         else {
739                 DBInboundEventQueue = DBInboundEventQueues[0];
740                 q = DBInboundEventQueues[1];
741         }
742         pthread_mutex_unlock(&DBEventQueueMutex);
743
744         Now = ev_now (event_db);
745         It = GetNewHashPos(q, 0);
746         while (GetNextHashPos(q, It, &len, &Key, &v))
747         {
748                 IOAddHandler *h = v;
749                 eNextState rc;
750                 if (h->IO->ID == 0)
751                         h->IO->ID = EvIDSource++;
752                 if (h->IO->StartDB == 0.0)
753                         h->IO->StartDB = Now;
754                 h->IO->Now = Now;
755                 rc = h->EvAttch(h->IO);
756                 switch (rc)
757                 {
758                 case eAbort:
759                         ShutDownDBCLient(h->IO);
760                 default:
761                         break;
762                 }
763         }
764         DeleteHashPos(&It);
765         DeleteHashContent(&q);
766         EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
767 }
768
769
770 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
771 {
772         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
773         ev_break(event_db, EVBREAK_ALL);
774 }
775
776
777
778 void DBInitEventQueue(void)
779 {
780         struct rlimit LimitSet;
781
782         pthread_mutex_init(&DBEventQueueMutex, NULL);
783
784         if (pipe(evdb_add_pipe) != 0) {
785                 EVQ_syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
786                 abort();
787         }
788         LimitSet.rlim_cur = 1;
789         LimitSet.rlim_max = 1;
790         setrlimit(evdb_add_pipe[1], &LimitSet);
791
792         DBQueueEvents = NewHash(1, Flathash);
793         DBInboundEventQueues[0] = NewHash(1, Flathash);
794         DBInboundEventQueues[1] = NewHash(1, Flathash);
795         DBInboundEventQueue = DBInboundEventQueues[0];
796 }
797
798 /*
799  * this thread operates writing to the message database via libev.
800  */
801 void *db_event_thread(void *arg)
802 {
803         struct CitContext libev_msg_CC;
804
805         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
806 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
807         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
808
809         event_db = ev_loop_new (EVFLAG_AUTO);
810
811         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
812         ev_async_start(event_db, &DBAddJob);
813         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
814         ev_async_start(event_db, &DBExitEventLoop);
815
816         ev_run (event_db, 0);
817
818         EVQM_syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
819
820 //// what to do here?   CtdlClearSystemContext();
821         ev_loop_destroy (event_db);
822
823         DeleteHash(&DBQueueEvents);
824         DBInboundEventQueue = NULL;
825         DeleteHash(&DBInboundEventQueues[0]);
826         DeleteHash(&DBInboundEventQueues[1]);
827
828         close(evdb_add_pipe[0]);
829         close(evdb_add_pipe[1]);
830 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
831
832         return(NULL);
833 }
834
835 void ShutDownEventQueues(void)
836 {
837         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
838
839         pthread_mutex_lock(&DBEventQueueMutex);
840         ev_async_send (event_db, &DBExitEventLoop);
841         pthread_mutex_unlock(&DBEventQueueMutex);
842
843         pthread_mutex_lock(&EventQueueMutex);
844         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
845         pthread_mutex_unlock(&EventQueueMutex);
846 }
847
848 void DebugEventloopEnable(const int n)
849 {
850         DebugEventLoop = n;
851 }
852 void DebugEventloopBacktraceEnable(const int n)
853 {
854         DebugEventLoopBacktrace = n;
855 }
856
857 void DebugCurlEnable(const int n)
858 {
859         DebugCurl = n;
860 }
861
862 CTDL_MODULE_INIT(event_client)
863 {
864         if (!threading)
865         {
866                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
867                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
868                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
869                 InitEventQueue();
870                 DBInitEventQueue();
871                 CtdlThreadCreate(client_event_thread);
872                 CtdlThreadCreate(db_event_thread);
873         }
874         return "event";
875 }