Indents are 8 characters wide and are expressed as a tab character.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2015 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12
13 #include "sysdep.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <termios.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <pwd.h>
21 #include <errno.h>
22 #include <sys/types.h>
23 #include <syslog.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35 #include <sys/wait.h>
36 #include <ctype.h>
37 #include <string.h>
38 #include <limits.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <assert.h>
42 #include <arpa/inet.h>
43 #include <libcitadel.h>
44 #include <curl/curl.h>
45 #include <curl/multi.h>
46 #include "citadel.h"
47 #include "server.h"
48 #include "citserver.h"
49 #include "support.h"
50 #include "ctdl_module.h"
51 #include "config.h"
52 #include "event_client.h"
53 #include "serv_curl.h"
54
55 ev_loop *event_base;
56 int DebugEventLoop = 0;
57 int DebugEventLoopBacktrace = 0;
58 int DebugCurl = 0;
59 pthread_key_t evConKey;
60
61 long EvIDSource = 1;
62 /*****************************************************************************
63  *                   libevent / curl integration                             *
64  *****************************************************************************/
65
66 #define MOPT(s, v)                                                      \
67         do {                                                            \
68                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
69                 if (sta) {                                              \
70                         syslog(LOG_ERR, "error setting option " \
71                                #s " on curl multi handle: %s\n",        \
72                                curl_easy_strerror(sta));                \
73                         exit (1);                                       \
74                 }                                                       \
75         } while (0)
76
77
78 typedef struct _evcurl_global_data {
79         int magic;
80         CURLM *mhnd;
81         ev_timer timeev;
82         int nrun;
83 } evcurl_global_data;
84
85 ev_async WakeupCurl;
86 evcurl_global_data global;
87
88 eNextState QueueAnDBOperation(AsyncIO *IO);
89
90 static void
91 gotstatus(int nnrun)
92 {
93         CURLMsg *msg;
94         int nmsg;
95
96         global.nrun = nnrun;
97
98         syslog(LOG_DEBUG,
99                      "gotstatus(): about to call curl_multi_info_read\n");
100         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
101                 syslog(LOG_DEBUG,
102                             "got curl multi_info message msg=%d\n",
103                             msg->msg);
104
105                 if (CURLMSG_DONE == msg->msg) {
106                         CURL *chnd;
107                         void *chandle = NULL;
108                         CURLcode sta;
109                         CURLMcode msta;
110                         AsyncIO*IO;
111
112                         chandle = NULL;;
113                         chnd = msg->easy_handle;
114                         sta = curl_easy_getinfo(chnd,
115                                                 CURLINFO_PRIVATE,
116                                                 &chandle);
117                         if (sta) {
118                                 syslog(LOG_ERR,
119                                        "error asking curl for private"
120                                        " cookie of curl handle: %s\n",
121                                        curl_easy_strerror(sta));
122                                 continue;
123                         }
124                         IO = (AsyncIO *)chandle;
125                         if (IO->ID == 0) {
126                                 syslog(LOG_ERR,
127                                               "Error, invalid IO context %p\n",
128                                               IO);
129                                 continue;
130                         }
131                         SetEVState(IO, eCurlGotStatus);
132
133                         syslog(LOG_DEBUG, "request complete\n");
134
135                         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
136
137                         ev_io_stop(event_base, &IO->recv_event);
138                         ev_io_stop(event_base, &IO->send_event);
139
140                         sta = msg->data.result;
141                         if (sta) {
142                                 syslog(LOG_ERR,
143                                               "error description: %s\n",
144                                               IO->HttpReq.errdesc);
145                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
146                                 syslog(LOG_ERR,
147                                               "error performing request: %s\n",
148                                               IO->HttpReq.CurlError);
149                                 if (sta == CURLE_OPERATION_TIMEDOUT)
150                                 {
151                                         IO->SendBuf.fd = 0;
152                                         IO->RecvBuf.fd = 0;
153                                 }
154                         }
155                         sta = curl_easy_getinfo(chnd,
156                                                 CURLINFO_RESPONSE_CODE,
157                                                 &IO->HttpReq.httpcode);
158                         if (sta)
159                                 syslog(LOG_ERR,
160                                               "error asking curl for "
161                                               "response code from request: %s\n",
162                                               curl_easy_strerror(sta));
163                         syslog(LOG_DEBUG,
164                                       "http response code was %ld\n",
165                                       (long)IO->HttpReq.httpcode);
166
167
168                         curl_slist_free_all(IO->HttpReq.headers);
169                         IO->HttpReq.headers = NULL;
170                         msta = curl_multi_remove_handle(global.mhnd, chnd);
171                         if (msta)
172                                 syslog(LOG_ERR,
173                                               "warning problem detaching "
174                                               "completed handle from curl multi: "
175                                               "%s\n",
176                                               curl_multi_strerror(msta));
177
178                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
179
180                         IO->HttpReq.attached = 0;
181                         switch(IO->SendDone(IO))
182                         {
183                         case eDBQuery:
184                                 FreeURL(&IO->ConnectMe);
185                                 QueueAnDBOperation(IO);
186                                 break;
187                         case eSendDNSQuery:
188                         case eReadDNSReply:
189                         case eConnect:
190                         case eSendReply:
191                         case eSendMore:
192                         case eSendFile:
193                         case eReadMessage:
194                         case eReadMore:
195                         case eReadPayload:
196                         case eReadFile:
197                                 break;
198                         case eTerminateConnection:
199                         case eAbort:
200                                 curl_easy_cleanup(IO->HttpReq.chnd);
201                                 IO->HttpReq.chnd = NULL;
202                                 FreeStrBuf(&IO->HttpReq.ReplyData);
203                                 FreeURL(&IO->ConnectMe);
204                                 RemoveContext(IO->CitContext);
205                                 IO->Terminate(IO);
206                         }
207                 }
208         }
209 }
210
211 static void
212 stepmulti(void *data, curl_socket_t fd, int which)
213 {
214         int running_handles = 0;
215         CURLMcode msta;
216
217         msta = curl_multi_socket_action(global.mhnd,
218                                         fd,
219                                         which,
220                                         &running_handles);
221
222         syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
223         if (msta)
224                 syslog(LOG_ERR,
225                             "error in curl processing events"
226                             "on multi handle, fd %d: %s\n",
227                             (int)fd,
228                             curl_multi_strerror(msta));
229
230         if (global.nrun != running_handles)
231                 gotstatus(running_handles);
232 }
233
234 static void
235 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
236 {
237         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
238         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
239 }
240
241 static void
242 got_in(struct ev_loop *loop, ev_io *ioev, int events)
243 {
244         syslog(LOG_DEBUG,
245                     "EVCURL: waking up curl for io on fd %d\n",
246                     (int)ioev->fd);
247
248         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
249 }
250
251 static void
252 got_out(struct ev_loop *loop, ev_io *ioev, int events)
253 {
254         syslog(LOG_DEBUG,
255                     "waking up curl for io on fd %d\n",
256                     (int)ioev->fd);
257
258         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
259 }
260
261 static size_t
262 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
263 {
264         AsyncIO *IO = (AsyncIO*) cglobal;
265
266         SetEVState(IO, eCurlGotData);
267         if (IO->HttpReq.ReplyData == NULL)
268         {
269                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
270         }
271         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
272         return CurlFillStrBuf_callback(data,
273                                        size,
274                                        nmemb,
275                                        IO->HttpReq.ReplyData);
276 }
277
278 static int
279 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
280         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
281         evcurl_global_data *global = cglobal;
282         ev_timer_stop(EV_DEFAULT, &global->timeev);
283         if (tblock_ms < 0 || 14000 < tblock_ms)
284                 tblock_ms = 14000;
285         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
286         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
287         curl_multi_perform(global, &global->nrun);
288         return 0;
289 }
290
291 static int
292 gotwatchsock(CURL *easy,
293              curl_socket_t fd,
294              int action,
295              void *cglobal,
296              void *vIO)
297 {
298         evcurl_global_data *global = cglobal;
299         CURLM *mhnd = global->mhnd;
300         void *f;
301         AsyncIO *IO = (AsyncIO*) vIO;
302         CURLcode sta;
303         const char *Action;
304
305         if (IO == NULL) {
306                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
307                 if (sta) {
308                         syslog(LOG_ERR,
309                                     "EVCURL: error asking curl for private "
310                                     "cookie of curl handle: %s\n",
311                                     curl_easy_strerror(sta));
312                         return -1;
313                 }
314                 IO = (AsyncIO *) f;
315                 SetEVState(IO, eCurlNewIO);
316                 syslog(LOG_DEBUG,
317                               "EVCURL: got socket for URL: %s\n",
318                               IO->ConnectMe->PlainUrl);
319
320                 if (IO->SendBuf.fd != 0)
321                 {
322                         ev_io_stop(event_base, &IO->recv_event);
323                         ev_io_stop(event_base, &IO->send_event);
324                 }
325                 IO->SendBuf.fd = fd;
326                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
327                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
328                 curl_multi_assign(mhnd, fd, IO);
329         }
330
331         SetEVState(IO, eCurlGotIO);
332         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
333
334         Action = "";
335         switch (action)
336         {
337         case CURL_POLL_NONE:
338                 Action = "CURL_POLL_NONE";
339                 break;
340         case CURL_POLL_REMOVE:
341                 Action = "CURL_POLL_REMOVE";
342                 break;
343         case CURL_POLL_IN:
344                 Action = "CURL_POLL_IN";
345                 break;
346         case CURL_POLL_OUT:
347                 Action = "CURL_POLL_OUT";
348                 break;
349         case CURL_POLL_INOUT:
350                 Action = "CURL_POLL_INOUT";
351                 break;
352         }
353
354
355         syslog(LOG_DEBUG,
356                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
357                       (int)fd, Action, action);
358
359         switch (action)
360         {
361         case CURL_POLL_NONE:
362                 syslog(LOG_DEBUG,
363                                "called first time "
364                                "to register this sockwatcker\n");
365                 break;
366         case CURL_POLL_REMOVE:
367                 syslog(LOG_DEBUG,
368                                "called last time to unregister "
369                                "this sockwatcher\n");
370                 ev_io_stop(event_base, &IO->recv_event);
371                 ev_io_stop(event_base, &IO->send_event);
372                 break;
373         case CURL_POLL_IN:
374                 ev_io_start(event_base, &IO->recv_event);
375                 ev_io_stop(event_base, &IO->send_event);
376                 break;
377         case CURL_POLL_OUT:
378                 ev_io_start(event_base, &IO->send_event);
379                 ev_io_stop(event_base, &IO->recv_event);
380                 break;
381         case CURL_POLL_INOUT:
382                 ev_io_start(event_base, &IO->send_event);
383                 ev_io_start(event_base, &IO->recv_event);
384                 break;
385         }
386         return 0;
387 }
388
389 void curl_init_connectionpool(void)
390 {
391         CURLM *mhnd ;
392
393         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
394         global.timeev.data = (void *)&global;
395         global.nrun = -1;
396         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
397
398         if (sta)
399         {
400                 syslog(LOG_ERR,
401                             "error initializing curl library: %s\n",
402                             curl_easy_strerror(sta));
403
404                 exit(1);
405         }
406         mhnd = global.mhnd = curl_multi_init();
407         if (!mhnd)
408         {
409                 syslog(LOG_ERR,
410                              "error initializing curl multi handle\n");
411                 exit(3);
412         }
413
414         MOPT(SOCKETFUNCTION, &gotwatchsock);
415         MOPT(SOCKETDATA, (void *)&global);
416         MOPT(TIMERFUNCTION, &gotwatchtime);
417         MOPT(TIMERDATA, (void *)&global);
418
419         return;
420 }
421
422 int evcurl_init(AsyncIO *IO)
423 {
424         CURLcode sta;
425         CURL *chnd;
426
427         syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
428         IO->HttpReq.attached = 0;
429         chnd = IO->HttpReq.chnd = curl_easy_init();
430         if (!chnd)
431         {
432                 syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
433                 return 0;
434         }
435
436 #if DEBUG
437         OPT(VERBOSE, (long)1);
438 #endif
439         OPT(NOPROGRESS, 1L);
440
441         OPT(NOSIGNAL, 1L);
442         OPT(FAILONERROR, (long)1);
443         OPT(ENCODING, "");
444         OPT(FOLLOWLOCATION, (long)0);
445         OPT(MAXREDIRS, (long)0);
446         OPT(USERAGENT, CITADEL);
447
448         OPT(TIMEOUT, (long)1800);
449         OPT(LOW_SPEED_LIMIT, (long)64);
450         OPT(LOW_SPEED_TIME, (long)600);
451         OPT(CONNECTTIMEOUT, (long)600);
452         OPT(PRIVATE, (void *)IO);
453
454         OPT(FORBID_REUSE, 1);
455         OPT(WRITEFUNCTION, &gotdata);
456         OPT(WRITEDATA, (void *)IO);
457         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
458
459         if ((!IsEmptyStr(CtdlGetConfigStr("c_ip_addr")))
460                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "*"))
461                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "::"))
462                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "0.0.0.0"))
463                 )
464         {
465                 OPT(INTERFACE, CtdlGetConfigStr("c_ip_addr"));
466         }
467
468 #ifdef CURLOPT_HTTP_CONTENT_DECODING
469         OPT(HTTP_CONTENT_DECODING, 1);
470         OPT(ENCODING, "");
471 #endif
472
473         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
474                                                 "Connection: close");
475
476         return 1;
477 }
478
479
480 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
481                                            ev_cleanup *watcher,
482                                            int revents)
483 {
484         CURLMcode msta;
485         AsyncIO *IO = watcher->data;
486
487         if (IO == NULL)
488                 return;
489
490         SetEVState(IO, eCurlShutdown);
491         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
492         syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
493
494         curl_slist_free_all(IO->HttpReq.headers);
495         IO->HttpReq.headers = NULL;
496         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
497         if (msta)
498         {
499                 syslog(LOG_ERR,
500                               "EVCURL: warning problem detaching completed handle "
501                               "from curl multi: %s\n",
502                               curl_multi_strerror(msta));
503         }
504
505         curl_easy_cleanup(IO->HttpReq.chnd);
506         IO->HttpReq.chnd = NULL;
507         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
508         ev_io_stop(event_base, &IO->recv_event);
509         ev_io_stop(event_base, &IO->send_event);
510         assert(IO->ShutdownAbort);
511         IO->ShutdownAbort(IO);
512 }
513
514 eNextState
515 evcurl_handle_start(AsyncIO *IO)
516 {
517         CURLMcode msta;
518         CURLcode sta;
519         CURL *chnd;
520
521         SetEVState(IO, eCurlStart);
522         chnd = IO->HttpReq.chnd;
523         syslog(LOG_DEBUG,
524                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
525         OPT(URL, IO->ConnectMe->PlainUrl);
526         if (StrLength(IO->ConnectMe->CurlCreds))
527         {
528                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
529                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
530         }
531         if (StrLength(IO->HttpReq.PostData) > 0)
532         {
533                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
534                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
535
536         }
537         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
538                  (IO->HttpReq.PlainPostData != NULL))
539         {
540                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
541                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
542         }
543         OPT(HTTPHEADER, IO->HttpReq.headers);
544
545         IO->NextState = eConnect;
546         syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
547         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
548         if (msta)
549         {
550                 syslog(LOG_ERR,
551                           "EVCURL: error attaching to curl multi handle: %s\n",
552                           curl_multi_strerror(msta));
553         }
554
555         IO->HttpReq.attached = 1;
556         ev_async_send (event_base, &WakeupCurl);
557
558         ev_cleanup_init(&IO->abort_by_shutdown,
559                         IOcurl_abort_shutdown_callback);
560
561         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
562
563         return eReadMessage;
564 }
565
566 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
567 {
568         syslog(LOG_DEBUG, "waking up curl multi handle\n");
569
570         curl_multi_perform(&global, CURL_POLL_NONE);
571 }
572
573 static void evcurl_shutdown (void)
574 {
575         curl_global_cleanup();
576         curl_multi_cleanup(global.mhnd);
577         syslog(LOG_DEBUG, "exiting\n");
578 }
579 /*****************************************************************************
580  *                       libevent integration                                *
581  *****************************************************************************/
582 /*
583  * client event queue plus its methods.
584  * this currently is the main loop (which may change in some future?)
585  */
586 int evbase_count = 0;
587 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
588 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
589 HashList *QueueEvents = NULL;
590 HashList *InboundEventQueue = NULL;
591 HashList *InboundEventQueues[2] = { NULL, NULL };
592 extern void ShutDownCLient(AsyncIO *IO);
593
594 ev_async AddJob;
595 ev_async ExitEventLoop;
596
597 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
598 {
599         CitContext *Ctx;
600         long IOID = -1;
601         long count = 0;
602         ev_tstamp Now;
603         HashList *q;
604         void *v;
605         HashPos*It;
606         long len;
607         const char *Key;
608
609         /* get the control command... */
610         pthread_mutex_lock(&EventQueueMutex);
611
612         if (InboundEventQueues[0] == InboundEventQueue) {
613                 InboundEventQueue = InboundEventQueues[1];
614                 q = InboundEventQueues[0];
615         }
616         else {
617                 InboundEventQueue = InboundEventQueues[0];
618                 q = InboundEventQueues[1];
619         }
620         pthread_mutex_unlock(&EventQueueMutex);
621         Now = ev_now (event_base);
622         It = GetNewHashPos(q, 0);
623         while (GetNextHashPos(q, It, &len, &Key, &v))
624         {
625                 IOAddHandler *h = v;
626                 count ++;
627                 if (h->IO->ID == 0) {
628                         h->IO->ID = EvIDSource++;
629                 }
630                 IOID = h->IO->ID;
631                 if (h->IO->StartIO == 0.0)
632                         h->IO->StartIO = Now;
633
634                 SetEVState(h->IO, eIOAttach);
635
636                 Ctx = h->IO->CitContext;
637                 become_session(Ctx);
638
639                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
640                 switch (h->EvAttch(h->IO))
641                 {
642                 case eReadMore:
643                 case eReadMessage:
644                 case eReadFile:
645                 case eSendReply:
646                 case eSendMore:
647                 case eReadPayload:
648                 case eSendFile:
649                 case eDBQuery:
650                 case eSendDNSQuery:
651                 case eReadDNSReply:
652                 case eConnect:
653                         break;
654                 case eTerminateConnection:
655                 case eAbort:
656                         ShutDownCLient(h->IO);
657                 break;
658                 }
659         }
660         DeleteHashPos(&It);
661         DeleteHashContent(&q);
662         syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
663 }
664
665
666 static void EventExitCallback(EV_P_ ev_async *w, int revents)
667 {
668         ev_break(event_base, EVBREAK_ALL);
669
670         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
671 }
672
673
674
675 void InitEventQueue(void)
676 {
677         pthread_mutex_init(&EventQueueMutex, NULL);
678         pthread_mutex_init(&EventExitQueueMutex, NULL);
679
680         QueueEvents = NewHash(1, Flathash);
681         InboundEventQueues[0] = NewHash(1, Flathash);
682         InboundEventQueues[1] = NewHash(1, Flathash);
683         InboundEventQueue = InboundEventQueues[0];
684 }
685 extern void CtdlDestroyEVCleanupHooks(void);
686
687 extern int EVQShutDown;
688 const char *IOLog = "IO";
689 /*
690  * this thread operates the select() etc. via libev.
691  */
692 void *client_event_thread(void *arg) 
693 {
694         struct CitContext libev_client_CC;
695
696         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
697
698         pthread_setspecific(evConKey, IOLog);
699
700         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
701
702         event_base = ev_default_loop (EVFLAG_AUTO);
703         ev_async_init(&AddJob, QueueEventAddCallback);
704         ev_async_start(event_base, &AddJob);
705         ev_async_init(&ExitEventLoop, EventExitCallback);
706         ev_async_start(event_base, &ExitEventLoop);
707         ev_async_init(&WakeupCurl, WakeupCurlCallback);
708         ev_async_start(event_base, &WakeupCurl);
709
710         curl_init_connectionpool();
711
712         ev_run (event_base, 0);
713
714         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
715
716 ///what todo here?      CtdlClearSystemContext();
717         pthread_mutex_lock(&EventExitQueueMutex);
718         ev_loop_destroy (EV_DEFAULT_UC);
719         event_base = NULL;
720         DeleteHash(&QueueEvents);
721         InboundEventQueue = NULL;
722         DeleteHash(&InboundEventQueues[0]);
723         DeleteHash(&InboundEventQueues[1]);
724 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
725         evcurl_shutdown();
726
727         CtdlDestroyEVCleanupHooks();
728
729         pthread_mutex_unlock(&EventExitQueueMutex);
730         EVQShutDown = 1;
731         return(NULL);
732 }
733
734 /*----------------------------------------------------------------------------*/
735 /*
736  * DB-Queue; does async bdb operations.
737  * has its own set of handlers.
738  */
739 ev_loop *event_db;
740 int evdb_count = 0;
741 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
742 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
743 HashList *DBQueueEvents = NULL;
744 HashList *DBInboundEventQueue = NULL;
745 HashList *DBInboundEventQueues[2] = { NULL, NULL };
746
747 ev_async DBAddJob;
748 ev_async DBExitEventLoop;
749
750 extern void ShutDownDBCLient(AsyncIO *IO);
751
752 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
753 {
754         CitContext *Ctx;
755         long IOID = -1;
756         long count = 0;;
757         ev_tstamp Now;
758         HashList *q;
759         void *v;
760         HashPos *It;
761         long len;
762         const char *Key;
763
764         /* get the control command... */
765         pthread_mutex_lock(&DBEventQueueMutex);
766
767         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
768                 DBInboundEventQueue = DBInboundEventQueues[1];
769                 q = DBInboundEventQueues[0];
770         }
771         else {
772                 DBInboundEventQueue = DBInboundEventQueues[0];
773                 q = DBInboundEventQueues[1];
774         }
775         pthread_mutex_unlock(&DBEventQueueMutex);
776
777         Now = ev_now (event_db);
778         It = GetNewHashPos(q, 0);
779         while (GetNextHashPos(q, It, &len, &Key, &v))
780         {
781                 IOAddHandler *h = v;
782                 eNextState rc;
783                 count ++;
784                 if (h->IO->ID == 0)
785                         h->IO->ID = EvIDSource++;
786                 IOID = h->IO->ID;
787                 if (h->IO->StartDB == 0.0)
788                         h->IO->StartDB = Now;
789                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
790
791                 SetEVState(h->IO, eDBAttach);
792                 Ctx = h->IO->CitContext;
793                 become_session(Ctx);
794                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
795                 rc = h->EvAttch(h->IO);
796                 switch (rc)
797                 {
798                 case eAbort:
799                         ShutDownDBCLient(h->IO);
800                 default:
801                         break;
802                 }
803         }
804         DeleteHashPos(&It);
805         DeleteHashContent(&q);
806         syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
807 }
808
809
810 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
811 {
812         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
813         ev_break(event_db, EVBREAK_ALL);
814 }
815
816
817
818 void DBInitEventQueue(void)
819 {
820         pthread_mutex_init(&DBEventQueueMutex, NULL);
821         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
822
823         DBQueueEvents = NewHash(1, Flathash);
824         DBInboundEventQueues[0] = NewHash(1, Flathash);
825         DBInboundEventQueues[1] = NewHash(1, Flathash);
826         DBInboundEventQueue = DBInboundEventQueues[0];
827 }
828
829 const char *DBLog = "BD";
830
831 /*
832  * this thread operates writing to the message database via libev.
833  */
834 void *db_event_thread(void *arg)
835 {
836         ev_loop *tmp;
837         struct CitContext libev_msg_CC;
838
839         pthread_setspecific(evConKey, DBLog);
840
841         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
842
843         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
844
845         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
846
847         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
848         ev_async_start(event_db, &DBAddJob);
849         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
850         ev_async_start(event_db, &DBExitEventLoop);
851
852         ev_run (event_db, 0);
853
854         pthread_mutex_lock(&DBEventExitQueueMutex);
855
856         event_db = NULL;
857         syslog(LOG_INFO, "dbevent_thread() exiting\n");
858
859         DeleteHash(&DBQueueEvents);
860         DBInboundEventQueue = NULL;
861         DeleteHash(&DBInboundEventQueues[0]);
862         DeleteHash(&DBInboundEventQueues[1]);
863
864 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
865
866         ev_loop_destroy (tmp);
867         pthread_mutex_unlock(&DBEventExitQueueMutex);
868         return(NULL);
869 }
870
871 void ShutDownEventQueues(void)
872 {
873         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
874
875         pthread_mutex_lock(&DBEventQueueMutex);
876         ev_async_send (event_db, &DBExitEventLoop);
877         pthread_mutex_unlock(&DBEventQueueMutex);
878
879         pthread_mutex_lock(&EventQueueMutex);
880         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
881         pthread_mutex_unlock(&EventQueueMutex);
882 }
883
884 void DebugEventloopEnable(const int n)
885 {
886         DebugEventLoop = n;
887 }
888 void DebugEventloopBacktraceEnable(const int n)
889 {
890         DebugEventLoopBacktrace = n;
891 }
892
893 void DebugCurlEnable(const int n)
894 {
895         DebugCurl = n;
896 }
897
898 const char *WLog = "WX";
899 CTDL_MODULE_INIT(event_client)
900 {
901         if (!threading)
902         {
903                 if (pthread_key_create(&evConKey, NULL) != 0) {
904                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
905                 }
906                 pthread_setspecific(evConKey, WLog);
907
908                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
909                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
910                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
911                 InitEventQueue();
912                 DBInitEventQueue();
913                 CtdlThreadCreate(client_event_thread);
914                 CtdlThreadCreate(db_event_thread);
915         }
916         return "event";
917 }