55d6fe6f217f043240edadb2febc8b76b66d9faa
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2015 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12
13 #include "sysdep.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <termios.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <pwd.h>
21 #include <errno.h>
22 #include <sys/types.h>
23 #include <syslog.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35 #include <sys/wait.h>
36 #include <ctype.h>
37 #include <string.h>
38 #include <limits.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <assert.h>
42 #include <arpa/inet.h>
43 #include <libcitadel.h>
44 #include <curl/curl.h>
45 #include <curl/multi.h>
46 #include "citadel.h"
47 #include "server.h"
48 #include "citserver.h"
49 #include "support.h"
50 #include "ctdl_module.h"
51 #include "config.h"
52 #include "event_client.h"
53 #include "serv_curl.h"
54
55 ev_loop *event_base;
56 int DebugEventLoop = 0;
57 int DebugEventLoopBacktrace = 0;
58 int DebugCurl = 0;
59 pthread_key_t evConKey;
60
61 long EvIDSource = 1;
62 /*****************************************************************************
63  *                   libevent / curl integration                             *
64  *****************************************************************************/
65 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
66
67 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
68         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
69                               IOSTR, IO->ID, CCID, __VA_ARGS__)
70
71 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
72         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
73                               IOSTR, IO->ID, CCID)
74
75 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
76         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
77
78 #define CURLM_syslog(LEVEL, FORMAT)                     \
79         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
80
81 #define MOPT(s, v)                                                      \
82         do {                                                            \
83                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
84                 if (sta) {                                              \
85                         EVQ_syslog(LOG_ERR, "error setting option "     \
86                                #s " on curl multi handle: %s\n",        \
87                                curl_easy_strerror(sta));                \
88                         exit (1);                                       \
89                 }                                                       \
90         } while (0)
91
92
93 typedef struct _evcurl_global_data {
94         int magic;
95         CURLM *mhnd;
96         ev_timer timeev;
97         int nrun;
98 } evcurl_global_data;
99
100 ev_async WakeupCurl;
101 evcurl_global_data global;
102
103 eNextState QueueAnDBOperation(AsyncIO *IO);
104
105 static void
106 gotstatus(int nnrun)
107 {
108         CURLMsg *msg;
109         int nmsg;
110
111         global.nrun = nnrun;
112
113         CURLM_syslog(LOG_DEBUG,
114                      "gotstatus(): about to call curl_multi_info_read\n");
115         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
116                 CURL_syslog(LOG_DEBUG,
117                             "got curl multi_info message msg=%d\n",
118                             msg->msg);
119
120                 if (CURLMSG_DONE == msg->msg) {
121                         CURL *chnd;
122                         void *chandle = NULL;
123                         CURLcode sta;
124                         CURLMcode msta;
125                         AsyncIO*IO;
126
127                         chandle = NULL;;
128                         chnd = msg->easy_handle;
129                         sta = curl_easy_getinfo(chnd,
130                                                 CURLINFO_PRIVATE,
131                                                 &chandle);
132                         if (sta) {
133                                 syslog(LOG_ERR,
134                                        "error asking curl for private"
135                                        " cookie of curl handle: %s\n",
136                                        curl_easy_strerror(sta));
137                                 continue;
138                         }
139                         IO = (AsyncIO *)chandle;
140                         if (IO->ID == 0) {
141                                 EVCURL_syslog(LOG_ERR,
142                                               "Error, invalid IO context %p\n",
143                                               IO);
144                                 continue;
145                         }
146                         SetEVState(IO, eCurlGotStatus);
147
148                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
149
150                         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
151
152                         ev_io_stop(event_base, &IO->recv_event);
153                         ev_io_stop(event_base, &IO->send_event);
154
155                         sta = msg->data.result;
156                         if (sta) {
157                                 EVCURL_syslog(LOG_ERR,
158                                               "error description: %s\n",
159                                               IO->HttpReq.errdesc);
160                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
161                                 EVCURL_syslog(LOG_ERR,
162                                               "error performing request: %s\n",
163                                               IO->HttpReq.CurlError);
164                                 if (sta == CURLE_OPERATION_TIMEDOUT)
165                                 {
166                                         IO->SendBuf.fd = 0;
167                                         IO->RecvBuf.fd = 0;
168                                 }
169                         }
170                         sta = curl_easy_getinfo(chnd,
171                                                 CURLINFO_RESPONSE_CODE,
172                                                 &IO->HttpReq.httpcode);
173                         if (sta)
174                                 EVCURL_syslog(LOG_ERR,
175                                               "error asking curl for "
176                                               "response code from request: %s\n",
177                                               curl_easy_strerror(sta));
178                         EVCURL_syslog(LOG_DEBUG,
179                                       "http response code was %ld\n",
180                                       (long)IO->HttpReq.httpcode);
181
182
183                         curl_slist_free_all(IO->HttpReq.headers);
184                         IO->HttpReq.headers = NULL;
185                         msta = curl_multi_remove_handle(global.mhnd, chnd);
186                         if (msta)
187                                 EVCURL_syslog(LOG_ERR,
188                                               "warning problem detaching "
189                                               "completed handle from curl multi: "
190                                               "%s\n",
191                                               curl_multi_strerror(msta));
192
193                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
194
195                         IO->HttpReq.attached = 0;
196                         switch(IO->SendDone(IO))
197                         {
198                         case eDBQuery:
199                                 FreeURL(&IO->ConnectMe);
200                                 QueueAnDBOperation(IO);
201                                 break;
202                         case eSendDNSQuery:
203                         case eReadDNSReply:
204                         case eConnect:
205                         case eSendReply:
206                         case eSendMore:
207                         case eSendFile:
208                         case eReadMessage:
209                         case eReadMore:
210                         case eReadPayload:
211                         case eReadFile:
212                                 break;
213                         case eTerminateConnection:
214                         case eAbort:
215                                 curl_easy_cleanup(IO->HttpReq.chnd);
216                                 IO->HttpReq.chnd = NULL;
217                                 FreeStrBuf(&IO->HttpReq.ReplyData);
218                                 FreeURL(&IO->ConnectMe);
219                                 RemoveContext(IO->CitContext);
220                                 IO->Terminate(IO);
221                         }
222                 }
223         }
224 }
225
226 static void
227 stepmulti(void *data, curl_socket_t fd, int which)
228 {
229         int running_handles = 0;
230         CURLMcode msta;
231
232         msta = curl_multi_socket_action(global.mhnd,
233                                         fd,
234                                         which,
235                                         &running_handles);
236
237         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
238         if (msta)
239                 CURL_syslog(LOG_ERR,
240                             "error in curl processing events"
241                             "on multi handle, fd %d: %s\n",
242                             (int)fd,
243                             curl_multi_strerror(msta));
244
245         if (global.nrun != running_handles)
246                 gotstatus(running_handles);
247 }
248
249 static void
250 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
251 {
252         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
253         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
254 }
255
256 static void
257 got_in(struct ev_loop *loop, ev_io *ioev, int events)
258 {
259         CURL_syslog(LOG_DEBUG,
260                     "EVCURL: waking up curl for io on fd %d\n",
261                     (int)ioev->fd);
262
263         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
264 }
265
266 static void
267 got_out(struct ev_loop *loop, ev_io *ioev, int events)
268 {
269         CURL_syslog(LOG_DEBUG,
270                     "waking up curl for io on fd %d\n",
271                     (int)ioev->fd);
272
273         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
274 }
275
276 static size_t
277 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
278 {
279         AsyncIO *IO = (AsyncIO*) cglobal;
280
281         SetEVState(IO, eCurlGotData);
282         if (IO->HttpReq.ReplyData == NULL)
283         {
284                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
285         }
286         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
287         return CurlFillStrBuf_callback(data,
288                                        size,
289                                        nmemb,
290                                        IO->HttpReq.ReplyData);
291 }
292
293 static int
294 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
295         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
296         evcurl_global_data *global = cglobal;
297         ev_timer_stop(EV_DEFAULT, &global->timeev);
298         if (tblock_ms < 0 || 14000 < tblock_ms)
299                 tblock_ms = 14000;
300         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
301         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
302         curl_multi_perform(global, &global->nrun);
303         return 0;
304 }
305
306 static int
307 gotwatchsock(CURL *easy,
308              curl_socket_t fd,
309              int action,
310              void *cglobal,
311              void *vIO)
312 {
313         evcurl_global_data *global = cglobal;
314         CURLM *mhnd = global->mhnd;
315         void *f;
316         AsyncIO *IO = (AsyncIO*) vIO;
317         CURLcode sta;
318         const char *Action;
319
320         if (IO == NULL) {
321                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
322                 if (sta) {
323                         CURL_syslog(LOG_ERR,
324                                     "EVCURL: error asking curl for private "
325                                     "cookie of curl handle: %s\n",
326                                     curl_easy_strerror(sta));
327                         return -1;
328                 }
329                 IO = (AsyncIO *) f;
330                 SetEVState(IO, eCurlNewIO);
331                 EVCURL_syslog(LOG_DEBUG,
332                               "EVCURL: got socket for URL: %s\n",
333                               IO->ConnectMe->PlainUrl);
334
335                 if (IO->SendBuf.fd != 0)
336                 {
337                         ev_io_stop(event_base, &IO->recv_event);
338                         ev_io_stop(event_base, &IO->send_event);
339                 }
340                 IO->SendBuf.fd = fd;
341                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
342                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
343                 curl_multi_assign(mhnd, fd, IO);
344         }
345
346         SetEVState(IO, eCurlGotIO);
347         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
348
349         Action = "";
350         switch (action)
351         {
352         case CURL_POLL_NONE:
353                 Action = "CURL_POLL_NONE";
354                 break;
355         case CURL_POLL_REMOVE:
356                 Action = "CURL_POLL_REMOVE";
357                 break;
358         case CURL_POLL_IN:
359                 Action = "CURL_POLL_IN";
360                 break;
361         case CURL_POLL_OUT:
362                 Action = "CURL_POLL_OUT";
363                 break;
364         case CURL_POLL_INOUT:
365                 Action = "CURL_POLL_INOUT";
366                 break;
367         }
368
369
370         EVCURL_syslog(LOG_DEBUG,
371                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
372                       (int)fd, Action, action);
373
374         switch (action)
375         {
376         case CURL_POLL_NONE:
377                 EVCURLM_syslog(LOG_DEBUG,
378                                "called first time "
379                                "to register this sockwatcker\n");
380                 break;
381         case CURL_POLL_REMOVE:
382                 EVCURLM_syslog(LOG_DEBUG,
383                                "called last time to unregister "
384                                "this sockwatcher\n");
385                 ev_io_stop(event_base, &IO->recv_event);
386                 ev_io_stop(event_base, &IO->send_event);
387                 break;
388         case CURL_POLL_IN:
389                 ev_io_start(event_base, &IO->recv_event);
390                 ev_io_stop(event_base, &IO->send_event);
391                 break;
392         case CURL_POLL_OUT:
393                 ev_io_start(event_base, &IO->send_event);
394                 ev_io_stop(event_base, &IO->recv_event);
395                 break;
396         case CURL_POLL_INOUT:
397                 ev_io_start(event_base, &IO->send_event);
398                 ev_io_start(event_base, &IO->recv_event);
399                 break;
400         }
401         return 0;
402 }
403
404 void curl_init_connectionpool(void)
405 {
406         CURLM *mhnd ;
407
408         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
409         global.timeev.data = (void *)&global;
410         global.nrun = -1;
411         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
412
413         if (sta)
414         {
415                 CURL_syslog(LOG_ERR,
416                             "error initializing curl library: %s\n",
417                             curl_easy_strerror(sta));
418
419                 exit(1);
420         }
421         mhnd = global.mhnd = curl_multi_init();
422         if (!mhnd)
423         {
424                 CURLM_syslog(LOG_ERR,
425                              "error initializing curl multi handle\n");
426                 exit(3);
427         }
428
429         MOPT(SOCKETFUNCTION, &gotwatchsock);
430         MOPT(SOCKETDATA, (void *)&global);
431         MOPT(TIMERFUNCTION, &gotwatchtime);
432         MOPT(TIMERDATA, (void *)&global);
433
434         return;
435 }
436
437 int evcurl_init(AsyncIO *IO)
438 {
439         CURLcode sta;
440         CURL *chnd;
441
442         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
443         IO->HttpReq.attached = 0;
444         chnd = IO->HttpReq.chnd = curl_easy_init();
445         if (!chnd)
446         {
447                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
448                 return 0;
449         }
450
451 #if DEBUG
452         OPT(VERBOSE, (long)1);
453 #endif
454         OPT(NOPROGRESS, 1L);
455
456         OPT(NOSIGNAL, 1L);
457         OPT(FAILONERROR, (long)1);
458         OPT(ENCODING, "");
459         OPT(FOLLOWLOCATION, (long)0);
460         OPT(MAXREDIRS, (long)0);
461         OPT(USERAGENT, CITADEL);
462
463         OPT(TIMEOUT, (long)1800);
464         OPT(LOW_SPEED_LIMIT, (long)64);
465         OPT(LOW_SPEED_TIME, (long)600);
466         OPT(CONNECTTIMEOUT, (long)600);
467         OPT(PRIVATE, (void *)IO);
468
469         OPT(FORBID_REUSE, 1);
470         OPT(WRITEFUNCTION, &gotdata);
471         OPT(WRITEDATA, (void *)IO);
472         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
473
474         if ((!IsEmptyStr(CtdlGetConfigStr("c_ip_addr")))
475                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "*"))
476                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "::"))
477                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "0.0.0.0"))
478                 )
479         {
480                 OPT(INTERFACE, CtdlGetConfigStr("c_ip_addr"));
481         }
482
483 #ifdef CURLOPT_HTTP_CONTENT_DECODING
484         OPT(HTTP_CONTENT_DECODING, 1);
485         OPT(ENCODING, "");
486 #endif
487
488         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
489                                                 "Connection: close");
490
491         return 1;
492 }
493
494
495 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
496                                            ev_cleanup *watcher,
497                                            int revents)
498 {
499         CURLMcode msta;
500         AsyncIO *IO = watcher->data;
501
502         if (IO == NULL)
503                 return;
504
505         SetEVState(IO, eCurlShutdown);
506         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
507         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
508
509         curl_slist_free_all(IO->HttpReq.headers);
510         IO->HttpReq.headers = NULL;
511         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
512         if (msta)
513         {
514                 EVCURL_syslog(LOG_ERR,
515                               "EVCURL: warning problem detaching completed handle "
516                               "from curl multi: %s\n",
517                               curl_multi_strerror(msta));
518         }
519
520         curl_easy_cleanup(IO->HttpReq.chnd);
521         IO->HttpReq.chnd = NULL;
522         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
523         ev_io_stop(event_base, &IO->recv_event);
524         ev_io_stop(event_base, &IO->send_event);
525         assert(IO->ShutdownAbort);
526         IO->ShutdownAbort(IO);
527 }
528
529 eNextState
530 evcurl_handle_start(AsyncIO *IO)
531 {
532         CURLMcode msta;
533         CURLcode sta;
534         CURL *chnd;
535
536         SetEVState(IO, eCurlStart);
537         chnd = IO->HttpReq.chnd;
538         EVCURL_syslog(LOG_DEBUG,
539                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
540         OPT(URL, IO->ConnectMe->PlainUrl);
541         if (StrLength(IO->ConnectMe->CurlCreds))
542         {
543                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
544                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
545         }
546         if (StrLength(IO->HttpReq.PostData) > 0)
547         {
548                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
549                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
550
551         }
552         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
553                  (IO->HttpReq.PlainPostData != NULL))
554         {
555                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
556                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
557         }
558         OPT(HTTPHEADER, IO->HttpReq.headers);
559
560         IO->NextState = eConnect;
561         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
562         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
563         if (msta)
564         {
565                 EVCURL_syslog(LOG_ERR,
566                           "EVCURL: error attaching to curl multi handle: %s\n",
567                           curl_multi_strerror(msta));
568         }
569
570         IO->HttpReq.attached = 1;
571         ev_async_send (event_base, &WakeupCurl);
572
573         ev_cleanup_init(&IO->abort_by_shutdown,
574                         IOcurl_abort_shutdown_callback);
575
576         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
577
578         return eReadMessage;
579 }
580
581 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
582 {
583         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
584
585         curl_multi_perform(&global, CURL_POLL_NONE);
586 }
587
588 static void evcurl_shutdown (void)
589 {
590         curl_global_cleanup();
591         curl_multi_cleanup(global.mhnd);
592         CURLM_syslog(LOG_DEBUG, "exiting\n");
593 }
594 /*****************************************************************************
595  *                       libevent integration                                *
596  *****************************************************************************/
597 /*
598  * client event queue plus its methods.
599  * this currently is the main loop (which may change in some future?)
600  */
601 int evbase_count = 0;
602 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
603 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
604 HashList *QueueEvents = NULL;
605 HashList *InboundEventQueue = NULL;
606 HashList *InboundEventQueues[2] = { NULL, NULL };
607 extern void ShutDownCLient(AsyncIO *IO);
608
609 ev_async AddJob;
610 ev_async ExitEventLoop;
611
612 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
613 {
614         CitContext *Ctx;
615         long IOID = -1;
616         long count = 0;
617         ev_tstamp Now;
618         HashList *q;
619         void *v;
620         HashPos*It;
621         long len;
622         const char *Key;
623
624         /* get the control command... */
625         pthread_mutex_lock(&EventQueueMutex);
626
627         if (InboundEventQueues[0] == InboundEventQueue) {
628                 InboundEventQueue = InboundEventQueues[1];
629                 q = InboundEventQueues[0];
630         }
631         else {
632                 InboundEventQueue = InboundEventQueues[0];
633                 q = InboundEventQueues[1];
634         }
635         pthread_mutex_unlock(&EventQueueMutex);
636         Now = ev_now (event_base);
637         It = GetNewHashPos(q, 0);
638         while (GetNextHashPos(q, It, &len, &Key, &v))
639         {
640                 IOAddHandler *h = v;
641                 count ++;
642                 if (h->IO->ID == 0) {
643                         h->IO->ID = EvIDSource++;
644                 }
645                 IOID = h->IO->ID;
646                 if (h->IO->StartIO == 0.0)
647                         h->IO->StartIO = Now;
648
649                 SetEVState(h->IO, eIOAttach);
650
651                 Ctx = h->IO->CitContext;
652                 become_session(Ctx);
653
654                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
655                 switch (h->EvAttch(h->IO))
656                 {
657                 case eReadMore:
658                 case eReadMessage:
659                 case eReadFile:
660                 case eSendReply:
661                 case eSendMore:
662                 case eReadPayload:
663                 case eSendFile:
664                 case eDBQuery:
665                 case eSendDNSQuery:
666                 case eReadDNSReply:
667                 case eConnect:
668                         break;
669                 case eTerminateConnection:
670                 case eAbort:
671                         ShutDownCLient(h->IO);
672                 break;
673                 }
674         }
675         DeleteHashPos(&It);
676         DeleteHashContent(&q);
677         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
678 }
679
680
681 static void EventExitCallback(EV_P_ ev_async *w, int revents)
682 {
683         ev_break(event_base, EVBREAK_ALL);
684
685         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
686 }
687
688
689
690 void InitEventQueue(void)
691 {
692         pthread_mutex_init(&EventQueueMutex, NULL);
693         pthread_mutex_init(&EventExitQueueMutex, NULL);
694
695         QueueEvents = NewHash(1, Flathash);
696         InboundEventQueues[0] = NewHash(1, Flathash);
697         InboundEventQueues[1] = NewHash(1, Flathash);
698         InboundEventQueue = InboundEventQueues[0];
699 }
700 extern void CtdlDestroyEVCleanupHooks(void);
701
702 extern int EVQShutDown;
703 const char *IOLog = "IO";
704 /*
705  * this thread operates the select() etc. via libev.
706  */
707 void *client_event_thread(void *arg) 
708 {
709         struct CitContext libev_client_CC;
710
711         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
712
713         pthread_setspecific(evConKey, IOLog);
714
715         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
716
717         event_base = ev_default_loop (EVFLAG_AUTO);
718         ev_async_init(&AddJob, QueueEventAddCallback);
719         ev_async_start(event_base, &AddJob);
720         ev_async_init(&ExitEventLoop, EventExitCallback);
721         ev_async_start(event_base, &ExitEventLoop);
722         ev_async_init(&WakeupCurl, WakeupCurlCallback);
723         ev_async_start(event_base, &WakeupCurl);
724
725         curl_init_connectionpool();
726
727         ev_run (event_base, 0);
728
729         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
730
731 ///what todo here?      CtdlClearSystemContext();
732         pthread_mutex_lock(&EventExitQueueMutex);
733         ev_loop_destroy (EV_DEFAULT_UC);
734         event_base = NULL;
735         DeleteHash(&QueueEvents);
736         InboundEventQueue = NULL;
737         DeleteHash(&InboundEventQueues[0]);
738         DeleteHash(&InboundEventQueues[1]);
739 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
740         evcurl_shutdown();
741
742         CtdlDestroyEVCleanupHooks();
743
744         pthread_mutex_unlock(&EventExitQueueMutex);
745         EVQShutDown = 1;
746         return(NULL);
747 }
748
749 /*----------------------------------------------------------------------------*/
750 /*
751  * DB-Queue; does async bdb operations.
752  * has its own set of handlers.
753  */
754 ev_loop *event_db;
755 int evdb_count = 0;
756 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
757 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
758 HashList *DBQueueEvents = NULL;
759 HashList *DBInboundEventQueue = NULL;
760 HashList *DBInboundEventQueues[2] = { NULL, NULL };
761
762 ev_async DBAddJob;
763 ev_async DBExitEventLoop;
764
765 extern void ShutDownDBCLient(AsyncIO *IO);
766
767 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
768 {
769         CitContext *Ctx;
770         long IOID = -1;
771         long count = 0;;
772         ev_tstamp Now;
773         HashList *q;
774         void *v;
775         HashPos *It;
776         long len;
777         const char *Key;
778
779         /* get the control command... */
780         pthread_mutex_lock(&DBEventQueueMutex);
781
782         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
783                 DBInboundEventQueue = DBInboundEventQueues[1];
784                 q = DBInboundEventQueues[0];
785         }
786         else {
787                 DBInboundEventQueue = DBInboundEventQueues[0];
788                 q = DBInboundEventQueues[1];
789         }
790         pthread_mutex_unlock(&DBEventQueueMutex);
791
792         Now = ev_now (event_db);
793         It = GetNewHashPos(q, 0);
794         while (GetNextHashPos(q, It, &len, &Key, &v))
795         {
796                 IOAddHandler *h = v;
797                 eNextState rc;
798                 count ++;
799                 if (h->IO->ID == 0)
800                         h->IO->ID = EvIDSource++;
801                 IOID = h->IO->ID;
802                 if (h->IO->StartDB == 0.0)
803                         h->IO->StartDB = Now;
804                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
805
806                 SetEVState(h->IO, eDBAttach);
807                 Ctx = h->IO->CitContext;
808                 become_session(Ctx);
809                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
810                 rc = h->EvAttch(h->IO);
811                 switch (rc)
812                 {
813                 case eAbort:
814                         ShutDownDBCLient(h->IO);
815                 default:
816                         break;
817                 }
818         }
819         DeleteHashPos(&It);
820         DeleteHashContent(&q);
821         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
822 }
823
824
825 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
826 {
827         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
828         ev_break(event_db, EVBREAK_ALL);
829 }
830
831
832
833 void DBInitEventQueue(void)
834 {
835         pthread_mutex_init(&DBEventQueueMutex, NULL);
836         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
837
838         DBQueueEvents = NewHash(1, Flathash);
839         DBInboundEventQueues[0] = NewHash(1, Flathash);
840         DBInboundEventQueues[1] = NewHash(1, Flathash);
841         DBInboundEventQueue = DBInboundEventQueues[0];
842 }
843
844 const char *DBLog = "BD";
845
846 /*
847  * this thread operates writing to the message database via libev.
848  */
849 void *db_event_thread(void *arg)
850 {
851         ev_loop *tmp;
852         struct CitContext libev_msg_CC;
853
854         pthread_setspecific(evConKey, DBLog);
855
856         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
857
858         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
859
860         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
861
862         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
863         ev_async_start(event_db, &DBAddJob);
864         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
865         ev_async_start(event_db, &DBExitEventLoop);
866
867         ev_run (event_db, 0);
868
869         pthread_mutex_lock(&DBEventExitQueueMutex);
870
871         event_db = NULL;
872         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
873
874         DeleteHash(&DBQueueEvents);
875         DBInboundEventQueue = NULL;
876         DeleteHash(&DBInboundEventQueues[0]);
877         DeleteHash(&DBInboundEventQueues[1]);
878
879 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
880
881         ev_loop_destroy (tmp);
882         pthread_mutex_unlock(&DBEventExitQueueMutex);
883         return(NULL);
884 }
885
886 void ShutDownEventQueues(void)
887 {
888         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
889
890         pthread_mutex_lock(&DBEventQueueMutex);
891         ev_async_send (event_db, &DBExitEventLoop);
892         pthread_mutex_unlock(&DBEventQueueMutex);
893
894         pthread_mutex_lock(&EventQueueMutex);
895         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
896         pthread_mutex_unlock(&EventQueueMutex);
897 }
898
899 void DebugEventloopEnable(const int n)
900 {
901         DebugEventLoop = n;
902 }
903 void DebugEventloopBacktraceEnable(const int n)
904 {
905         DebugEventLoopBacktrace = n;
906 }
907
908 void DebugCurlEnable(const int n)
909 {
910         DebugCurl = n;
911 }
912
913 const char *WLog = "WX";
914 CTDL_MODULE_INIT(event_client)
915 {
916         if (!threading)
917         {
918                 if (pthread_key_create(&evConKey, NULL) != 0) {
919                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
920                 }
921                 pthread_setspecific(evConKey, WLog);
922
923                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
924                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
925                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
926                 InitEventQueue();
927                 DBInitEventQueue();
928                 CtdlThreadCreate(client_event_thread);
929                 CtdlThreadCreate(db_event_thread);
930         }
931         return "event";
932 }