Mostly we should happen to add one IO context, output its id.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66 pthread_key_t evConKey;
67
68 long EvIDSource = 1;
69 /*****************************************************************************
70  *                   libevent / curl integration                             *
71  *****************************************************************************/
72 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
73
74 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
75         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
76                               IOSTR, IO->ID, CCID, __VA_ARGS__)
77
78 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
79         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
80                               IOSTR, IO->ID, CCID)
81
82 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
83         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
84
85 #define CURLM_syslog(LEVEL, FORMAT)                     \
86         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
87
88 #define MOPT(s, v)                                                      \
89         do {                                                            \
90                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
91                 if (sta) {                                              \
92                         EVQ_syslog(LOG_ERR, "error setting option "     \
93                                #s " on curl multi handle: %s\n",        \
94                                curl_easy_strerror(sta));                \
95                         exit (1);                                       \
96                 }                                                       \
97         } while (0)
98
99
100 typedef struct _evcurl_global_data {
101         int magic;
102         CURLM *mhnd;
103         ev_timer timeev;
104         int nrun;
105 } evcurl_global_data;
106
107 ev_async WakeupCurl;
108 evcurl_global_data global;
109
110 static void
111 gotstatus(int nnrun)
112 {
113         CURLMsg *msg;
114         int nmsg;
115
116         global.nrun = nnrun;
117
118         CURLM_syslog(LOG_DEBUG,
119                      "gotstatus(): about to call curl_multi_info_read\n");
120         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
121                 CURL_syslog(LOG_DEBUG,
122                             "got curl multi_info message msg=%d\n",
123                             msg->msg);
124
125                 if (CURLMSG_DONE == msg->msg) {
126                         CURL *chnd;
127                         char *chandle = NULL;
128                         CURLcode sta;
129                         CURLMcode msta;
130                         AsyncIO*IO;
131
132                         chandle = NULL;;
133                         chnd = msg->easy_handle;
134                         sta = curl_easy_getinfo(chnd,
135                                                 CURLINFO_PRIVATE,
136                                                 &chandle);
137                         if (sta) {
138                                 syslog(LOG_ERR,
139                                        "error asking curl for private"
140                                        " cookie of curl handle: %s\n",
141                                        curl_easy_strerror(sta));
142                                 continue;
143                         }
144                         IO = (AsyncIO *)chandle;
145                         if (IO->ID == 0) {
146                                 EVCURL_syslog(LOG_ERR,
147                                               "Error, invalid IO context %p\n",
148                                               IO);
149                                 continue;
150                         }
151                         SetEVState(IO, eCurlGotStatus);
152
153                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
154
155                         IO->Now = ev_now(event_base);
156
157                         ev_io_stop(event_base, &IO->recv_event);
158                         ev_io_stop(event_base, &IO->send_event);
159
160                         sta = msg->data.result;
161                         if (sta) {
162                                 EVCURL_syslog(LOG_ERR,
163                                               "error description: %s\n",
164                                               IO->HttpReq.errdesc);
165                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
166                                 EVCURL_syslog(LOG_ERR,
167                                               "error performing request: %s\n",
168                                               IO->HttpReq.CurlError);
169                                 if (sta == CURLE_OPERATION_TIMEDOUT)
170                                 {
171                                         IO->SendBuf.fd = 0;
172                                         IO->RecvBuf.fd = 0;
173                                 }
174                         }
175                         sta = curl_easy_getinfo(chnd,
176                                                 CURLINFO_RESPONSE_CODE,
177                                                 &IO->HttpReq.httpcode);
178                         if (sta)
179                                 EVCURL_syslog(LOG_ERR,
180                                               "error asking curl for "
181                                               "response code from request: %s\n",
182                                               curl_easy_strerror(sta));
183                         EVCURL_syslog(LOG_DEBUG,
184                                       "http response code was %ld\n",
185                                       (long)IO->HttpReq.httpcode);
186
187
188                         curl_slist_free_all(IO->HttpReq.headers);
189                         msta = curl_multi_remove_handle(global.mhnd, chnd);
190                         if (msta)
191                                 EVCURL_syslog(LOG_ERR,
192                                               "warning problem detaching "
193                                               "completed handle from curl multi: "
194                                               "%s\n",
195                                               curl_multi_strerror(msta));
196
197                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
198
199                         IO->HttpReq.attached = 0;
200                         switch(IO->SendDone(IO))
201                         {
202                         case eDBQuery:
203                         case eSendDNSQuery:
204                         case eReadDNSReply:
205                         case eConnect:
206                         case eSendReply:
207                         case eSendMore:
208                         case eSendFile:
209                         case eReadMessage:
210                         case eReadMore:
211                         case eReadPayload:
212                         case eReadFile:
213                                 break;
214                         case eTerminateConnection:
215                         case eAbort:
216                                 curl_easy_cleanup(IO->HttpReq.chnd);
217                                 IO->HttpReq.chnd = NULL;
218                                 FreeStrBuf(&IO->HttpReq.ReplyData);
219                                 FreeURL(&IO->ConnectMe);
220                                 RemoveContext(IO->CitContext);
221                                 IO->Terminate(IO);
222                         }
223                 }
224         }
225 }
226
227 static void
228 stepmulti(void *data, curl_socket_t fd, int which)
229 {
230         int running_handles = 0;
231         CURLMcode msta;
232
233         msta = curl_multi_socket_action(global.mhnd,
234                                         fd,
235                                         which,
236                                         &running_handles);
237
238         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
239         if (msta)
240                 CURL_syslog(LOG_ERR,
241                             "error in curl processing events"
242                             "on multi handle, fd %d: %s\n",
243                             (int)fd,
244                             curl_multi_strerror(msta));
245
246         if (global.nrun != running_handles)
247                 gotstatus(running_handles);
248 }
249
250 static void
251 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
252 {
253         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
254         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
255 }
256
257 static void
258 got_in(struct ev_loop *loop, ev_io *ioev, int events)
259 {
260         CURL_syslog(LOG_DEBUG,
261                     "EVCURL: waking up curl for io on fd %d\n",
262                     (int)ioev->fd);
263
264         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
265 }
266
267 static void
268 got_out(struct ev_loop *loop, ev_io *ioev, int events)
269 {
270         CURL_syslog(LOG_DEBUG,
271                     "waking up curl for io on fd %d\n",
272                     (int)ioev->fd);
273
274         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
275 }
276
277 static size_t
278 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
279 {
280         AsyncIO *IO = (AsyncIO*) cglobal;
281
282         SetEVState(IO, eCurlGotData);
283         if (IO->HttpReq.ReplyData == NULL)
284         {
285                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
286         }
287         IO->Now = ev_now(event_base);
288         return CurlFillStrBuf_callback(data,
289                                        size,
290                                        nmemb,
291                                        IO->HttpReq.ReplyData);
292 }
293
294 static int
295 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
296         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
297         evcurl_global_data *global = cglobal;
298         ev_timer_stop(EV_DEFAULT, &global->timeev);
299         if (tblock_ms < 0 || 14000 < tblock_ms)
300                 tblock_ms = 14000;
301         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
302         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
303         curl_multi_perform(global, &global->nrun);
304         return 0;
305 }
306
307 static int
308 gotwatchsock(CURL *easy,
309              curl_socket_t fd,
310              int action,
311              void *cglobal,
312              void *vIO)
313 {
314         evcurl_global_data *global = cglobal;
315         CURLM *mhnd = global->mhnd;
316         char *f;
317         AsyncIO *IO = (AsyncIO*) vIO;
318         CURLcode sta;
319         const char *Action;
320
321         if (IO == NULL) {
322                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
323                 if (sta) {
324                         CURL_syslog(LOG_ERR,
325                                     "EVCURL: error asking curl for private "
326                                     "cookie of curl handle: %s\n",
327                                     curl_easy_strerror(sta));
328                         return -1;
329                 }
330                 IO = (AsyncIO *) f;
331                 SetEVState(IO, eCurlNewIO);
332                 EVCURL_syslog(LOG_DEBUG,
333                               "EVCURL: got socket for URL: %s\n",
334                               IO->ConnectMe->PlainUrl);
335
336                 if (IO->SendBuf.fd != 0)
337                 {
338                         ev_io_stop(event_base, &IO->recv_event);
339                         ev_io_stop(event_base, &IO->send_event);
340                 }
341                 IO->SendBuf.fd = fd;
342                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
343                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
344                 curl_multi_assign(mhnd, fd, IO);
345         }
346
347         SetEVState(IO, eCurlGotIO);
348         IO->Now = ev_now(event_base);
349
350         Action = "";
351         switch (action)
352         {
353         case CURL_POLL_NONE:
354                 Action = "CURL_POLL_NONE";
355                 break;
356         case CURL_POLL_REMOVE:
357                 Action = "CURL_POLL_REMOVE";
358                 break;
359         case CURL_POLL_IN:
360                 Action = "CURL_POLL_IN";
361                 break;
362         case CURL_POLL_OUT:
363                 Action = "CURL_POLL_OUT";
364                 break;
365         case CURL_POLL_INOUT:
366                 Action = "CURL_POLL_INOUT";
367                 break;
368         }
369
370
371         EVCURL_syslog(LOG_DEBUG,
372                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
373                       (int)fd, Action, action);
374
375         switch (action)
376         {
377         case CURL_POLL_NONE:
378                 EVCURLM_syslog(LOG_DEBUG,
379                                "called first time "
380                                "to register this sockwatcker\n");
381                 break;
382         case CURL_POLL_REMOVE:
383                 EVCURLM_syslog(LOG_DEBUG,
384                                "called last time to unregister "
385                                "this sockwatcher\n");
386                 ev_io_stop(event_base, &IO->recv_event);
387                 ev_io_stop(event_base, &IO->send_event);
388                 break;
389         case CURL_POLL_IN:
390                 ev_io_start(event_base, &IO->recv_event);
391                 ev_io_stop(event_base, &IO->send_event);
392                 break;
393         case CURL_POLL_OUT:
394                 ev_io_start(event_base, &IO->send_event);
395                 ev_io_stop(event_base, &IO->recv_event);
396                 break;
397         case CURL_POLL_INOUT:
398                 ev_io_start(event_base, &IO->send_event);
399                 ev_io_start(event_base, &IO->recv_event);
400                 break;
401         }
402         return 0;
403 }
404
405 void curl_init_connectionpool(void)
406 {
407         CURLM *mhnd ;
408
409         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
410         global.timeev.data = (void *)&global;
411         global.nrun = -1;
412         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
413
414         if (sta)
415         {
416                 CURL_syslog(LOG_ERR,
417                             "error initializing curl library: %s\n",
418                             curl_easy_strerror(sta));
419
420                 exit(1);
421         }
422         mhnd = global.mhnd = curl_multi_init();
423         if (!mhnd)
424         {
425                 CURLM_syslog(LOG_ERR,
426                              "error initializing curl multi handle\n");
427                 exit(3);
428         }
429
430         MOPT(SOCKETFUNCTION, &gotwatchsock);
431         MOPT(SOCKETDATA, (void *)&global);
432         MOPT(TIMERFUNCTION, &gotwatchtime);
433         MOPT(TIMERDATA, (void *)&global);
434
435         return;
436 }
437
438 int evcurl_init(AsyncIO *IO)
439 {
440         CURLcode sta;
441         CURL *chnd;
442
443         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
444         IO->HttpReq.attached = 0;
445         chnd = IO->HttpReq.chnd = curl_easy_init();
446         if (!chnd)
447         {
448                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
449                 return 0;
450         }
451
452 #if DEBUG
453         OPT(VERBOSE, (long)1);
454 #endif
455         OPT(NOPROGRESS, 1L);
456
457         OPT(NOSIGNAL, 1L);
458         OPT(FAILONERROR, (long)1);
459         OPT(ENCODING, "");
460         OPT(FOLLOWLOCATION, (long)0);
461         OPT(MAXREDIRS, (long)0);
462         OPT(USERAGENT, CITADEL);
463
464         OPT(TIMEOUT, (long)1800);
465         OPT(LOW_SPEED_LIMIT, (long)64);
466         OPT(LOW_SPEED_TIME, (long)600);
467         OPT(CONNECTTIMEOUT, (long)600);
468         OPT(PRIVATE, (void *)IO);
469
470         OPT(FORBID_REUSE, 1);
471         OPT(WRITEFUNCTION, &gotdata);
472         OPT(WRITEDATA, (void *)IO);
473         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
474
475         if ((!IsEmptyStr(config.c_ip_addr))
476                 && (strcmp(config.c_ip_addr, "*"))
477                 && (strcmp(config.c_ip_addr, "::"))
478                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
479                 )
480         {
481                 OPT(INTERFACE, config.c_ip_addr);
482         }
483
484 #ifdef CURLOPT_HTTP_CONTENT_DECODING
485         OPT(HTTP_CONTENT_DECODING, 1);
486         OPT(ENCODING, "");
487 #endif
488
489         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
490                                                 "Connection: close");
491
492         return 1;
493 }
494
495
496 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
497                                            ev_cleanup *watcher,
498                                            int revents)
499 {
500         CURLMcode msta;
501         AsyncIO *IO = watcher->data;
502
503         if (IO == NULL)
504                 return;
505
506         SetEVState(IO, eCurlShutdown);
507         IO->Now = ev_now(event_base);
508         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
509
510         curl_slist_free_all(IO->HttpReq.headers);
511         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
512         if (msta)
513         {
514                 EVCURL_syslog(LOG_ERR,
515                               "EVCURL: warning problem detaching completed handle "
516                               "from curl multi: %s\n",
517                               curl_multi_strerror(msta));
518         }
519
520         curl_easy_cleanup(IO->HttpReq.chnd);
521         IO->HttpReq.chnd = NULL;
522         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
523         ev_io_stop(event_base, &IO->recv_event);
524         ev_io_stop(event_base, &IO->send_event);
525         assert(IO->ShutdownAbort);
526         IO->ShutdownAbort(IO);
527 }
528 eNextState
529 evcurl_handle_start(AsyncIO *IO)
530 {
531         CURLMcode msta;
532         CURLcode sta;
533         CURL *chnd;
534
535         SetEVState(IO, eCurlStart);
536         chnd = IO->HttpReq.chnd;
537         EVCURL_syslog(LOG_DEBUG,
538                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
539         OPT(URL, IO->ConnectMe->PlainUrl);
540         if (StrLength(IO->ConnectMe->CurlCreds))
541         {
542                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
543                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
544         }
545         if (StrLength(IO->HttpReq.PostData) > 0)
546         {
547                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
548                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
549
550         }
551         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
552                  (IO->HttpReq.PlainPostData != NULL))
553         {
554                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
555                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
556         }
557         OPT(HTTPHEADER, IO->HttpReq.headers);
558
559         IO->NextState = eConnect;
560         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
561         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
562         if (msta)
563         {
564                 EVCURL_syslog(LOG_ERR,
565                           "EVCURL: error attaching to curl multi handle: %s\n",
566                           curl_multi_strerror(msta));
567         }
568
569         IO->HttpReq.attached = 1;
570         ev_async_send (event_base, &WakeupCurl);
571
572         ev_cleanup_init(&IO->abort_by_shutdown,
573                         IOcurl_abort_shutdown_callback);
574
575         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
576
577         return eReadMessage;
578 }
579
580 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
581 {
582         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
583
584         curl_multi_perform(&global, CURL_POLL_NONE);
585 }
586
587 static void evcurl_shutdown (void)
588 {
589         curl_global_cleanup();
590         curl_multi_cleanup(global.mhnd);
591         CURLM_syslog(LOG_DEBUG, "exiting\n");
592 }
593 /*****************************************************************************
594  *                       libevent integration                                *
595  *****************************************************************************/
596 /*
597  * client event queue plus its methods.
598  * this currently is the main loop (which may change in some future?)
599  */
600 int evbase_count = 0;
601 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
602 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
603 HashList *QueueEvents = NULL;
604 HashList *InboundEventQueue = NULL;
605 HashList *InboundEventQueues[2] = { NULL, NULL };
606 extern void ShutDownCLient(AsyncIO *IO);
607
608 ev_async AddJob;
609 ev_async ExitEventLoop;
610
611 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
612 {
613         CitContext *Ctx;
614         long IOID = -1;
615         long count = 0;
616         ev_tstamp Now;
617         HashList *q;
618         void *v;
619         HashPos*It;
620         long len;
621         const char *Key;
622
623         /* get the control command... */
624         pthread_mutex_lock(&EventQueueMutex);
625
626         if (InboundEventQueues[0] == InboundEventQueue) {
627                 InboundEventQueue = InboundEventQueues[1];
628                 q = InboundEventQueues[0];
629         }
630         else {
631                 InboundEventQueue = InboundEventQueues[0];
632                 q = InboundEventQueues[1];
633         }
634         pthread_mutex_unlock(&EventQueueMutex);
635         Now = ev_now (event_base);
636         It = GetNewHashPos(q, 0);
637         while (GetNextHashPos(q, It, &len, &Key, &v))
638         {
639                 IOAddHandler *h = v;
640                 count ++;
641                 if (h->IO->ID == 0) {
642                         h->IO->ID = EvIDSource++;
643                 }
644                 IOID = h->IO->ID;
645                 if (h->IO->StartIO == 0.0)
646                         h->IO->StartIO = Now;
647
648                 SetEVState(h->IO, eIOAttach);
649
650                 Ctx = h->IO->CitContext;
651                 become_session(Ctx);
652
653                 h->IO->Now = Now;
654                 switch (h->EvAttch(h->IO))
655                 {
656                 case eReadMore:
657                 case eReadMessage:
658                 case eReadFile:
659                 case eSendReply:
660                 case eSendMore:
661                 case eReadPayload:
662                 case eSendFile:
663                 case eDBQuery:
664                 case eSendDNSQuery:
665                 case eReadDNSReply:
666                 case eConnect:
667                         break;
668                 case eTerminateConnection:
669                 case eAbort:
670                         ShutDownCLient(h->IO);
671                 break;
672                 }
673         }
674         DeleteHashPos(&It);
675         DeleteHashContent(&q);
676         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
677 }
678
679
680 static void EventExitCallback(EV_P_ ev_async *w, int revents)
681 {
682         ev_break(event_base, EVBREAK_ALL);
683
684         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
685 }
686
687
688
689 void InitEventQueue(void)
690 {
691         pthread_mutex_init(&EventQueueMutex, NULL);
692         pthread_mutex_init(&EventExitQueueMutex, NULL);
693
694         QueueEvents = NewHash(1, Flathash);
695         InboundEventQueues[0] = NewHash(1, Flathash);
696         InboundEventQueues[1] = NewHash(1, Flathash);
697         InboundEventQueue = InboundEventQueues[0];
698 }
699 extern void CtdlDestroyEVCleanupHooks(void);
700
701 extern int EVQShutDown;
702 const char *IOLog = "IO";
703 /*
704  * this thread operates the select() etc. via libev.
705  */
706 void *client_event_thread(void *arg) 
707 {
708         struct CitContext libev_client_CC;
709
710         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
711
712         pthread_setspecific(evConKey, IOLog);
713
714         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
715
716         event_base = ev_default_loop (EVFLAG_AUTO);
717         ev_async_init(&AddJob, QueueEventAddCallback);
718         ev_async_start(event_base, &AddJob);
719         ev_async_init(&ExitEventLoop, EventExitCallback);
720         ev_async_start(event_base, &ExitEventLoop);
721         ev_async_init(&WakeupCurl, WakeupCurlCallback);
722         ev_async_start(event_base, &WakeupCurl);
723
724         curl_init_connectionpool();
725
726         ev_run (event_base, 0);
727
728         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
729
730 ///what todo here?      CtdlClearSystemContext();
731         pthread_mutex_lock(&EventExitQueueMutex);
732         ev_loop_destroy (EV_DEFAULT_UC);
733         event_base = NULL;
734         DeleteHash(&QueueEvents);
735         InboundEventQueue = NULL;
736         DeleteHash(&InboundEventQueues[0]);
737         DeleteHash(&InboundEventQueues[1]);
738 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
739         evcurl_shutdown();
740
741         CtdlDestroyEVCleanupHooks();
742
743         pthread_mutex_unlock(&EventExitQueueMutex);
744         EVQShutDown = 1;
745         return(NULL);
746 }
747
748 /*----------------------------------------------------------------------------*/
749 /*
750  * DB-Queue; does async bdb operations.
751  * has its own set of handlers.
752  */
753 ev_loop *event_db;
754 int evdb_count = 0;
755 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
756 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
757 HashList *DBQueueEvents = NULL;
758 HashList *DBInboundEventQueue = NULL;
759 HashList *DBInboundEventQueues[2] = { NULL, NULL };
760
761 ev_async DBAddJob;
762 ev_async DBExitEventLoop;
763
764 extern void ShutDownDBCLient(AsyncIO *IO);
765
766 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
767 {
768         CitContext *Ctx;
769         long IOID = -1;
770         long count = 0;;
771         ev_tstamp Now;
772         HashList *q;
773         void *v;
774         HashPos *It;
775         long len;
776         const char *Key;
777
778         /* get the control command... */
779         pthread_mutex_lock(&DBEventQueueMutex);
780
781         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
782                 DBInboundEventQueue = DBInboundEventQueues[1];
783                 q = DBInboundEventQueues[0];
784         }
785         else {
786                 DBInboundEventQueue = DBInboundEventQueues[0];
787                 q = DBInboundEventQueues[1];
788         }
789         pthread_mutex_unlock(&DBEventQueueMutex);
790
791         Now = ev_now (event_db);
792         It = GetNewHashPos(q, 0);
793         while (GetNextHashPos(q, It, &len, &Key, &v))
794         {
795                 IOAddHandler *h = v;
796                 eNextState rc;
797                 count ++;
798                 if (h->IO->ID == 0)
799                         h->IO->ID = EvIDSource++;
800                 IOID = h->IO->ID;
801                 if (h->IO->StartDB == 0.0)
802                         h->IO->StartDB = Now;
803                 h->IO->Now = Now;
804
805                 SetEVState(h->IO, eDBAttach);
806                 Ctx = h->IO->CitContext;
807                 become_session(Ctx);
808                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
809                 rc = h->EvAttch(h->IO);
810                 switch (rc)
811                 {
812                 case eAbort:
813                         ShutDownDBCLient(h->IO);
814                 default:
815                         break;
816                 }
817         }
818         DeleteHashPos(&It);
819         DeleteHashContent(&q);
820         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
821 }
822
823
824 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
825 {
826         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
827         ev_break(event_db, EVBREAK_ALL);
828 }
829
830
831
832 void DBInitEventQueue(void)
833 {
834         pthread_mutex_init(&DBEventQueueMutex, NULL);
835         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
836
837         DBQueueEvents = NewHash(1, Flathash);
838         DBInboundEventQueues[0] = NewHash(1, Flathash);
839         DBInboundEventQueues[1] = NewHash(1, Flathash);
840         DBInboundEventQueue = DBInboundEventQueues[0];
841 }
842
843 const char *DBLog = "BD";
844
845 /*
846  * this thread operates writing to the message database via libev.
847  */
848 void *db_event_thread(void *arg)
849 {
850         ev_loop *tmp;
851         struct CitContext libev_msg_CC;
852
853         pthread_setspecific(evConKey, DBLog);
854
855         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
856
857         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
858
859         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
860
861         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
862         ev_async_start(event_db, &DBAddJob);
863         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
864         ev_async_start(event_db, &DBExitEventLoop);
865
866         ev_run (event_db, 0);
867
868         pthread_mutex_lock(&DBEventExitQueueMutex);
869
870         event_db = NULL;
871         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
872
873         DeleteHash(&DBQueueEvents);
874         DBInboundEventQueue = NULL;
875         DeleteHash(&DBInboundEventQueues[0]);
876         DeleteHash(&DBInboundEventQueues[1]);
877
878 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
879
880         ev_loop_destroy (tmp);
881         pthread_mutex_unlock(&DBEventExitQueueMutex);
882         return(NULL);
883 }
884
885 void ShutDownEventQueues(void)
886 {
887         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
888
889         pthread_mutex_lock(&DBEventQueueMutex);
890         ev_async_send (event_db, &DBExitEventLoop);
891         pthread_mutex_unlock(&DBEventQueueMutex);
892
893         pthread_mutex_lock(&EventQueueMutex);
894         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
895         pthread_mutex_unlock(&EventQueueMutex);
896 }
897
898 void DebugEventloopEnable(const int n)
899 {
900         DebugEventLoop = n;
901 }
902 void DebugEventloopBacktraceEnable(const int n)
903 {
904         DebugEventLoopBacktrace = n;
905 }
906
907 void DebugCurlEnable(const int n)
908 {
909         DebugCurl = n;
910 }
911
912 const char *WLog = "WX";
913 CTDL_MODULE_INIT(event_client)
914 {
915         if (!threading)
916         {
917                 if (pthread_key_create(&evConKey, NULL) != 0) {
918                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
919                 }
920                 pthread_setspecific(evConKey, WLog);
921
922                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
923                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
924                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
925                 InitEventQueue();
926                 DBInitEventQueue();
927                 CtdlThreadCreate(client_event_thread);
928                 CtdlThreadCreate(db_event_thread);
929         }
930         return "event";
931 }