7761b662e6a50da621a63a01611f3e14ea9d03f4
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66
67 long EvIDSource = 1;
68 /*****************************************************************************
69  *                   libevent / curl integration                             *
70  *****************************************************************************/
71 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
72
73 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
74         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
75                               IO->ID, CCID, __VA_ARGS__)
76
77 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
78         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
79                               IO->ID, CCID)
80
81 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
82         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
83
84 #define CURLM_syslog(LEVEL, FORMAT)                     \
85         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
86
87 #define MOPT(s, v)                                                      \
88         do {                                                            \
89                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
90                 if (sta) {                                              \
91                         EVQ_syslog(LOG_ERR, "error setting option "     \
92                                #s " on curl multi handle: %s\n",        \
93                                curl_easy_strerror(sta));                \
94                         exit (1);                                       \
95                 }                                                       \
96         } while (0)
97
98
99 typedef struct _evcurl_global_data {
100         int magic;
101         CURLM *mhnd;
102         ev_timer timeev;
103         int nrun;
104 } evcurl_global_data;
105
106 ev_async WakeupCurl;
107 evcurl_global_data global;
108
109 static void
110 gotstatus(int nnrun)
111 {
112         CURLMsg *msg;
113         int nmsg;
114
115         global.nrun = nnrun;
116
117         CURLM_syslog(LOG_DEBUG,
118                      "gotstatus(): about to call curl_multi_info_read\n");
119         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
120                 CURL_syslog(LOG_DEBUG,
121                             "got curl multi_info message msg=%d\n",
122                             msg->msg);
123
124                 if (CURLMSG_DONE == msg->msg) {
125                         CURL *chnd;
126                         char *chandle = NULL;
127                         CURLcode sta;
128                         CURLMcode msta;
129                         AsyncIO*IO;
130
131                         chandle = NULL;;
132                         chnd = msg->easy_handle;
133                         sta = curl_easy_getinfo(chnd,
134                                                 CURLINFO_PRIVATE,
135                                                 &chandle);
136                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
137                         if (sta) {
138                                 EVCURL_syslog(LOG_ERR,
139                                               "error asking curl for private"
140                                               " cookie of curl handle: %s\n",
141                                               curl_easy_strerror(sta));
142                                 continue;
143                         }
144                         IO = (AsyncIO *)chandle;
145
146                         IO->Now = ev_now(event_base);
147
148                         ev_io_stop(event_base, &IO->recv_event);
149                         ev_io_stop(event_base, &IO->send_event);
150
151                         sta = msg->data.result;
152                         if (sta) {
153                                 EVCURL_syslog(LOG_ERR,
154                                               "error description: %s\n",
155                                               IO->HttpReq.errdesc);
156                                 EVCURL_syslog(LOG_ERR,
157                                               "error performing request: %s\n",
158                                               curl_easy_strerror(sta));
159                         }
160                         sta = curl_easy_getinfo(chnd,
161                                                 CURLINFO_RESPONSE_CODE,
162                                                 &IO->HttpReq.httpcode);
163                         if (sta)
164                                 EVCURL_syslog(LOG_ERR,
165                                               "error asking curl for "
166                                               "response code from request: %s\n",
167                                               curl_easy_strerror(sta));
168                         EVCURL_syslog(LOG_ERR,
169                                       "http response code was %ld\n",
170                                       (long)IO->HttpReq.httpcode);
171
172
173                         curl_slist_free_all(IO->HttpReq.headers);
174                         msta = curl_multi_remove_handle(global.mhnd, chnd);
175                         if (msta)
176                                 EVCURL_syslog(LOG_ERR,
177                                               "warning problem detaching "
178                                               "completed handle from curl multi: "
179                                               "%s\n",
180                                               curl_multi_strerror(msta));
181
182                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
183
184                         IO->HttpReq.attached = 0;
185                         switch(IO->SendDone(IO))
186                         {
187                         case eDBQuery:
188                                 curl_easy_cleanup(IO->HttpReq.chnd);
189                                 IO->HttpReq.chnd = NULL;
190                                 break;
191                         case eSendDNSQuery:
192                         case eReadDNSReply:
193                         case eConnect:
194                         case eSendReply:
195                         case eSendMore:
196                         case eSendFile:
197                         case eReadMessage:
198                         case eReadMore:
199                         case eReadPayload:
200                         case eReadFile:
201                                 curl_easy_cleanup(IO->HttpReq.chnd);
202                                 IO->HttpReq.chnd = NULL;
203                                 break;
204                         case eTerminateConnection:
205                         case eAbort:
206                                 curl_easy_cleanup(IO->HttpReq.chnd);
207                                 IO->HttpReq.chnd = NULL;
208                                 FreeStrBuf(&IO->HttpReq.ReplyData);
209                                 FreeURL(&IO->ConnectMe);
210                                 RemoveContext(IO->CitContext);
211                                 IO->Terminate(IO);
212                         }
213                 }
214         }
215 }
216
217 static void
218 stepmulti(void *data, curl_socket_t fd, int which)
219 {
220         int running_handles = 0;
221         CURLMcode msta;
222
223         msta = curl_multi_socket_action(global.mhnd,
224                                         fd,
225                                         which,
226                                         &running_handles);
227
228         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
229         if (msta)
230                 CURL_syslog(LOG_ERR,
231                             "error in curl processing events"
232                             "on multi handle, fd %d: %s\n",
233                             (int)fd,
234                             curl_multi_strerror(msta));
235
236         if (global.nrun != running_handles)
237                 gotstatus(running_handles);
238 }
239
240 static void
241 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
242 {
243         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
244         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
245 }
246
247 static void
248 got_in(struct ev_loop *loop, ev_io *ioev, int events)
249 {
250         CURL_syslog(LOG_DEBUG,
251                     "EVCURL: waking up curl for io on fd %d\n",
252                     (int)ioev->fd);
253
254         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
255 }
256
257 static void
258 got_out(struct ev_loop *loop, ev_io *ioev, int events)
259 {
260         CURL_syslog(LOG_DEBUG,
261                     "waking up curl for io on fd %d\n",
262                     (int)ioev->fd);
263
264         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
265 }
266
267 static size_t
268 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
269         AsyncIO *IO = (AsyncIO*) cglobal;
270
271         if (IO->HttpReq.ReplyData == NULL)
272         {
273                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
274         }
275         IO->Now = ev_now(event_base);
276         return CurlFillStrBuf_callback(data,
277                                        size,
278                                        nmemb,
279                                        IO->HttpReq.ReplyData);
280 }
281
282 static int
283 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
284         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
285         evcurl_global_data *global = cglobal;
286         ev_timer_stop(EV_DEFAULT, &global->timeev);
287         if (tblock_ms < 0 || 14000 < tblock_ms)
288                 tblock_ms = 14000;
289         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
290         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
291         curl_multi_perform(global, &global->nrun);
292         return 0;
293 }
294
295 static int
296 gotwatchsock(CURL *easy,
297              curl_socket_t fd,
298              int action,
299              void *cglobal,
300              void *vIO)
301 {
302         evcurl_global_data *global = cglobal;
303         CURLM *mhnd = global->mhnd;
304         char *f;
305         AsyncIO *IO = (AsyncIO*) vIO;
306         CURLcode sta;
307         const char *Action;
308
309         if (IO == NULL) {
310                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
311                 if (sta) {
312                         EVCURL_syslog(LOG_ERR,
313                                       "EVCURL: error asking curl for private "
314                                       "cookie of curl handle: %s\n",
315                                       curl_easy_strerror(sta));
316                         return -1;
317                 }
318                 IO = (AsyncIO *) f;
319                 EVCURL_syslog(LOG_DEBUG,
320                               "EVCURL: got socket for URL: %s\n",
321                               IO->ConnectMe->PlainUrl);
322
323                 if (IO->SendBuf.fd != 0)
324                 {
325                         ev_io_stop(event_base, &IO->recv_event);
326                         ev_io_stop(event_base, &IO->send_event);
327                 }
328                 IO->SendBuf.fd = fd;
329                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
330                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
331                 curl_multi_assign(mhnd, fd, IO);
332         }
333
334         IO->Now = ev_now(event_base);
335
336         Action = "";
337         switch (action)
338         {
339         case CURL_POLL_NONE:
340                 Action = "CURL_POLL_NONE";
341                 break;
342         case CURL_POLL_REMOVE:
343                 Action = "CURL_POLL_REMOVE";
344                 break;
345         case CURL_POLL_IN:
346                 Action = "CURL_POLL_IN";
347                 break;
348         case CURL_POLL_OUT:
349                 Action = "CURL_POLL_OUT";
350                 break;
351         case CURL_POLL_INOUT:
352                 Action = "CURL_POLL_INOUT";
353                 break;
354         }
355
356
357         EVCURL_syslog(LOG_DEBUG,
358                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
359                       (int)fd, Action, action);
360
361         switch (action)
362         {
363         case CURL_POLL_NONE:
364                 EVCURLM_syslog(LOG_DEBUG,
365                                "called first time "
366                                "to register this sockwatcker\n");
367                 break;
368         case CURL_POLL_REMOVE:
369                 EVCURLM_syslog(LOG_DEBUG,
370                                "called last time to unregister "
371                                "this sockwatcher\n");
372                 ev_io_stop(event_base, &IO->recv_event);
373                 ev_io_stop(event_base, &IO->send_event);
374                 break;
375         case CURL_POLL_IN:
376                 ev_io_start(event_base, &IO->recv_event);
377                 ev_io_stop(event_base, &IO->send_event);
378                 break;
379         case CURL_POLL_OUT:
380                 ev_io_start(event_base, &IO->send_event);
381                 ev_io_stop(event_base, &IO->recv_event);
382                 break;
383         case CURL_POLL_INOUT:
384                 ev_io_start(event_base, &IO->send_event);
385                 ev_io_start(event_base, &IO->recv_event);
386                 break;
387         }
388         return 0;
389 }
390
391 void curl_init_connectionpool(void)
392 {
393         CURLM *mhnd ;
394
395         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
396         global.timeev.data = (void *)&global;
397         global.nrun = -1;
398         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
399
400         if (sta)
401         {
402                 CURL_syslog(LOG_ERR,
403                             "error initializing curl library: %s\n",
404                             curl_easy_strerror(sta));
405
406                 exit(1);
407         }
408         mhnd = global.mhnd = curl_multi_init();
409         if (!mhnd)
410         {
411                 CURLM_syslog(LOG_ERR,
412                              "error initializing curl multi handle\n");
413                 exit(3);
414         }
415
416         MOPT(SOCKETFUNCTION, &gotwatchsock);
417         MOPT(SOCKETDATA, (void *)&global);
418         MOPT(TIMERFUNCTION, &gotwatchtime);
419         MOPT(TIMERDATA, (void *)&global);
420
421         return;
422 }
423
424 int evcurl_init(AsyncIO *IO)
425 {
426         CURLcode sta;
427         CURL *chnd;
428
429         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
430         IO->HttpReq.attached = 0;
431         chnd = IO->HttpReq.chnd = curl_easy_init();
432         if (!chnd)
433         {
434                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
435                 return 0;
436         }
437
438 #if DEBUG
439         OPT(VERBOSE, (long)1);
440 #endif
441         OPT(NOPROGRESS, 1L);
442
443         OPT(NOSIGNAL, 1L);
444         OPT(FAILONERROR, (long)1);
445         OPT(ENCODING, "");
446         OPT(FOLLOWLOCATION, (long)0);
447         OPT(MAXREDIRS, (long)0);
448         OPT(USERAGENT, CITADEL);
449
450         OPT(TIMEOUT, (long)1800);
451         OPT(LOW_SPEED_LIMIT, (long)64);
452         OPT(LOW_SPEED_TIME, (long)600);
453         OPT(CONNECTTIMEOUT, (long)600);
454         OPT(PRIVATE, (void *)IO);
455
456         OPT(FORBID_REUSE, 1);
457         OPT(WRITEFUNCTION, &gotdata);
458         OPT(WRITEDATA, (void *)IO);
459         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
460
461         if ((!IsEmptyStr(config.c_ip_addr))
462                 && (strcmp(config.c_ip_addr, "*"))
463                 && (strcmp(config.c_ip_addr, "::"))
464                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
465                 )
466         {
467                 OPT(INTERFACE, config.c_ip_addr);
468         }
469
470 #ifdef CURLOPT_HTTP_CONTENT_DECODING
471         OPT(HTTP_CONTENT_DECODING, 1);
472         OPT(ENCODING, "");
473 #endif
474
475         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
476                                                 "Connection: close");
477
478         return 1;
479 }
480
481
482 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
483                                            ev_cleanup *watcher,
484                                            int revents)
485 {
486         CURLMcode msta;
487         AsyncIO *IO = watcher->data;
488         IO->Now = ev_now(event_base);
489         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
490
491         curl_slist_free_all(IO->HttpReq.headers);
492         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
493         if (msta)
494         {
495                 EVCURL_syslog(LOG_ERR,
496                               "EVCURL: warning problem detaching completed handle "
497                               "from curl multi: %s\n",
498                               curl_multi_strerror(msta));
499         }
500
501         curl_easy_cleanup(IO->HttpReq.chnd);
502         IO->HttpReq.chnd = NULL;
503         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
504         ev_io_stop(event_base, &IO->recv_event);
505         ev_io_stop(event_base, &IO->send_event);
506         assert(IO->ShutdownAbort);
507         IO->ShutdownAbort(IO);
508 }
509 eNextState
510 evcurl_handle_start(AsyncIO *IO)
511 {
512         CURLMcode msta;
513         CURLcode sta;
514         CURL *chnd;
515
516         chnd = IO->HttpReq.chnd;
517         EVCURL_syslog(LOG_DEBUG,
518                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
519         OPT(URL, IO->ConnectMe->PlainUrl);
520         if (StrLength(IO->ConnectMe->CurlCreds))
521         {
522                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
523                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
524         }
525         if (StrLength(IO->HttpReq.PostData) > 0)
526         {
527                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
528                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
529
530         }
531         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
532                  (IO->HttpReq.PlainPostData != NULL))
533         {
534                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
535                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
536         }
537         OPT(HTTPHEADER, IO->HttpReq.headers);
538
539         IO->NextState = eConnect;
540         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
541         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
542         if (msta)
543         {
544                 EVCURL_syslog(LOG_ERR,
545                           "EVCURL: error attaching to curl multi handle: %s\n",
546                           curl_multi_strerror(msta));
547         }
548
549         IO->HttpReq.attached = 1;
550         ev_async_send (event_base, &WakeupCurl);
551         ev_cleanup_init(&IO->abort_by_shutdown,
552                         IOcurl_abort_shutdown_callback);
553
554         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
555         return eReadMessage;
556 }
557
558 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
559 {
560         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
561
562         curl_multi_perform(&global, CURL_POLL_NONE);
563 }
564
565 static void evcurl_shutdown (void)
566 {
567         curl_global_cleanup();
568         curl_multi_cleanup(global.mhnd);
569         CURLM_syslog(LOG_DEBUG, "exiting\n");
570 }
571 /*****************************************************************************
572  *                       libevent integration                                *
573  *****************************************************************************/
574 /*
575  * client event queue plus its methods.
576  * this currently is the main loop (which may change in some future?)
577  */
578 int evbase_count = 0;
579 int event_add_pipe[2] = {-1, -1};
580 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
581 HashList *QueueEvents = NULL;
582 HashList *InboundEventQueue = NULL;
583 HashList *InboundEventQueues[2] = { NULL, NULL };
584
585 ev_async AddJob;
586 ev_async ExitEventLoop;
587
588 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
589 {
590         ev_tstamp Now;
591         HashList *q;
592         void *v;
593         HashPos*It;
594         long len;
595         const char *Key;
596
597         /* get the control command... */
598         pthread_mutex_lock(&EventQueueMutex);
599
600         if (InboundEventQueues[0] == InboundEventQueue) {
601                 InboundEventQueue = InboundEventQueues[1];
602                 q = InboundEventQueues[0];
603         }
604         else {
605                 InboundEventQueue = InboundEventQueues[0];
606                 q = InboundEventQueues[1];
607         }
608         pthread_mutex_unlock(&EventQueueMutex);
609         Now = ev_now (event_base);
610         It = GetNewHashPos(q, 0);
611         while (GetNextHashPos(q, It, &len, &Key, &v))
612         {
613                 IOAddHandler *h = v;
614                 if (h->IO->ID == 0) {
615                         h->IO->ID = EvIDSource++;
616                 }
617                 if (h->IO->StartIO == 0.0)
618                         h->IO->StartIO = Now;
619                 h->IO->Now = Now;
620                 h->EvAttch(h->IO);
621         }
622         DeleteHashPos(&It);
623         DeleteHashContent(&q);
624         EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n");
625 }
626
627
628 static void EventExitCallback(EV_P_ ev_async *w, int revents)
629 {
630         ev_break(event_base, EVBREAK_ALL);
631
632         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
633 }
634
635
636
637 void InitEventQueue(void)
638 {
639         struct rlimit LimitSet;
640
641         pthread_mutex_init(&EventQueueMutex, NULL);
642
643         if (pipe(event_add_pipe) != 0) {
644                 syslog(LOG_EMERG,
645                        "Unable to create pipe for libev queueing: %s\n",
646                        strerror(errno));
647                 abort();
648         }
649         LimitSet.rlim_cur = 1;
650         LimitSet.rlim_max = 1;
651         setrlimit(event_add_pipe[1], &LimitSet);
652
653         QueueEvents = NewHash(1, Flathash);
654         InboundEventQueues[0] = NewHash(1, Flathash);
655         InboundEventQueues[1] = NewHash(1, Flathash);
656         InboundEventQueue = InboundEventQueues[0];
657 }
658 extern void CtdlDestroyEVCleanupHooks(void);
659
660 extern int EVQShutDown;
661 /*
662  * this thread operates the select() etc. via libev.
663  */
664 void *client_event_thread(void *arg) 
665 {
666         struct CitContext libev_client_CC;
667
668         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
669 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
670         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
671
672         event_base = ev_default_loop (EVFLAG_AUTO);
673         ev_async_init(&AddJob, QueueEventAddCallback);
674         ev_async_start(event_base, &AddJob);
675         ev_async_init(&ExitEventLoop, EventExitCallback);
676         ev_async_start(event_base, &ExitEventLoop);
677         ev_async_init(&WakeupCurl, WakeupCurlCallback);
678         ev_async_start(event_base, &WakeupCurl);
679
680         curl_init_connectionpool();
681
682         ev_run (event_base, 0);
683
684         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
685
686 ///what todo here?      CtdlClearSystemContext();
687         ev_loop_destroy (EV_DEFAULT_UC);
688         DeleteHash(&QueueEvents);
689         InboundEventQueue = NULL;
690         DeleteHash(&InboundEventQueues[0]);
691         DeleteHash(&InboundEventQueues[1]);
692 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
693         evcurl_shutdown();
694         close(event_add_pipe[0]);
695         close(event_add_pipe[1]);
696
697         CtdlDestroyEVCleanupHooks();
698
699         EVQShutDown = 1;        
700         return(NULL);
701 }
702
703 /*----------------------------------------------------------------------------*/
704 /*
705  * DB-Queue; does async bdb operations.
706  * has its own set of handlers.
707  */
708 ev_loop *event_db;
709 int evdb_count = 0;
710 int evdb_add_pipe[2] = {-1, -1};
711 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
712 HashList *DBQueueEvents = NULL;
713 HashList *DBInboundEventQueue = NULL;
714 HashList *DBInboundEventQueues[2] = { NULL, NULL };
715
716 ev_async DBAddJob;
717 ev_async DBExitEventLoop;
718
719 extern void ShutDownDBCLient(AsyncIO *IO);
720
721 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
722 {
723         ev_tstamp Now;
724         HashList *q;
725         void *v;
726         HashPos *It;
727         long len;
728         const char *Key;
729
730         /* get the control command... */
731         pthread_mutex_lock(&DBEventQueueMutex);
732
733         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
734                 DBInboundEventQueue = DBInboundEventQueues[1];
735                 q = DBInboundEventQueues[0];
736         }
737         else {
738                 DBInboundEventQueue = DBInboundEventQueues[0];
739                 q = DBInboundEventQueues[1];
740         }
741         pthread_mutex_unlock(&DBEventQueueMutex);
742
743         Now = ev_now (event_db);
744         It = GetNewHashPos(q, 0);
745         while (GetNextHashPos(q, It, &len, &Key, &v))
746         {
747                 IOAddHandler *h = v;
748                 eNextState rc;
749                 if (h->IO->ID == 0)
750                         h->IO->ID = EvIDSource++;
751                 if (h->IO->StartDB == 0.0)
752                         h->IO->StartDB = Now;
753                 h->IO->Now = Now;
754                 rc = h->EvAttch(h->IO);
755                 switch (rc)
756                 {
757                 case eAbort:
758                         ShutDownDBCLient(h->IO);
759                 default:
760                         break;
761                 }
762         }
763         DeleteHashPos(&It);
764         DeleteHashContent(&q);
765         EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
766 }
767
768
769 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
770 {
771         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
772         ev_break(event_db, EVBREAK_ALL);
773 }
774
775
776
777 void DBInitEventQueue(void)
778 {
779         struct rlimit LimitSet;
780
781         pthread_mutex_init(&DBEventQueueMutex, NULL);
782
783         if (pipe(evdb_add_pipe) != 0) {
784                 EVQ_syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
785                 abort();
786         }
787         LimitSet.rlim_cur = 1;
788         LimitSet.rlim_max = 1;
789         setrlimit(evdb_add_pipe[1], &LimitSet);
790
791         DBQueueEvents = NewHash(1, Flathash);
792         DBInboundEventQueues[0] = NewHash(1, Flathash);
793         DBInboundEventQueues[1] = NewHash(1, Flathash);
794         DBInboundEventQueue = DBInboundEventQueues[0];
795 }
796
797 /*
798  * this thread operates writing to the message database via libev.
799  */
800 void *db_event_thread(void *arg)
801 {
802         struct CitContext libev_msg_CC;
803
804         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
805 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
806         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
807
808         event_db = ev_loop_new (EVFLAG_AUTO);
809
810         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
811         ev_async_start(event_db, &DBAddJob);
812         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
813         ev_async_start(event_db, &DBExitEventLoop);
814
815         ev_run (event_db, 0);
816
817         EVQM_syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
818
819 //// what to do here?   CtdlClearSystemContext();
820         ev_loop_destroy (event_db);
821
822         DeleteHash(&DBQueueEvents);
823         DBInboundEventQueue = NULL;
824         DeleteHash(&DBInboundEventQueues[0]);
825         DeleteHash(&DBInboundEventQueues[1]);
826
827         close(evdb_add_pipe[0]);
828         close(evdb_add_pipe[1]);
829 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
830
831         return(NULL);
832 }
833
834 void ShutDownEventQueues(void)
835 {
836         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
837
838         pthread_mutex_lock(&DBEventQueueMutex);
839         ev_async_send (event_db, &DBExitEventLoop);
840         pthread_mutex_unlock(&DBEventQueueMutex);
841
842         pthread_mutex_lock(&EventQueueMutex);
843         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
844         pthread_mutex_unlock(&EventQueueMutex);
845 }
846
847 void DebugEventloopEnable(const int n)
848 {
849         DebugEventLoop = n;
850 }
851 void DebugEventloopBacktraceEnable(const int n)
852 {
853         DebugEventLoopBacktrace = n;
854 }
855
856 void DebugCurlEnable(const int n)
857 {
858         DebugCurl = n;
859 }
860
861 CTDL_MODULE_INIT(event_client)
862 {
863         if (!threading)
864         {
865                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
866                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
867                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
868                 InitEventQueue();
869                 DBInitEventQueue();
870                 CtdlThreadCreate(client_event_thread);
871                 CtdlThreadCreate(db_event_thread);
872         }
873         return "event";
874 }