22a715615c6ffac6aa32629eb857b657f5fa60ba
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2015 by the citadel.org team
3  *
4  * This program is open source software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 3.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  * GNU General Public License for more details.
11  */
12
13 #include "sysdep.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <stdio.h>
17 #include <termios.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <pwd.h>
21 #include <errno.h>
22 #include <sys/types.h>
23 #include <syslog.h>
24
25 #if TIME_WITH_SYS_TIME
26 # include <sys/time.h>
27 # include <time.h>
28 #else
29 # if HAVE_SYS_TIME_H
30 #  include <sys/time.h>
31 # else
32 #  include <time.h>
33 # endif
34 #endif
35 #include <sys/wait.h>
36 #include <ctype.h>
37 #include <string.h>
38 #include <limits.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <assert.h>
42 #include <arpa/inet.h>
43 #include <libcitadel.h>
44 #include <curl/curl.h>
45 #include <curl/multi.h>
46 #include "citadel.h"
47 #include "server.h"
48 #include "citserver.h"
49 #include "support.h"
50 #include "ctdl_module.h"
51 #include "config.h"
52 #include "event_client.h"
53 #include "serv_curl.h"
54
55 ev_loop *event_base;
56 int DebugEventLoop = 0;
57 int DebugEventLoopBacktrace = 0;
58 int DebugCurl = 0;
59 pthread_key_t evConKey;
60
61 long EvIDSource = 1;
62 /*****************************************************************************
63  *                   libevent / curl integration                             *
64  *****************************************************************************/
65 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
66
67 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
68         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
69                               IOSTR, IO->ID, CCID, __VA_ARGS__)
70
71 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
72         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
73                               IOSTR, IO->ID, CCID)
74
75 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
76         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
77
78 #define CURLM_syslog(LEVEL, FORMAT)                     \
79         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
80
81 #define MOPT(s, v)                                                      \
82         do {                                                            \
83                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
84                 if (sta) {                                              \
85                         EVQ_syslog(LOG_ERR, "error setting option "     \
86                                #s " on curl multi handle: %s\n",        \
87                                curl_easy_strerror(sta));                \
88                         exit (1);                                       \
89                 }                                                       \
90         } while (0)
91
92
93 typedef struct _evcurl_global_data {
94         int magic;
95         CURLM *mhnd;
96         ev_timer timeev;
97         int nrun;
98 } evcurl_global_data;
99
100 ev_async WakeupCurl;
101 evcurl_global_data global;
102
103 eNextState QueueAnDBOperation(AsyncIO *IO);
104
105 static void
106 gotstatus(int nnrun)
107 {
108         CURLMsg *msg;
109         int nmsg;
110
111         global.nrun = nnrun;
112
113         CURLM_syslog(LOG_DEBUG,
114                      "gotstatus(): about to call curl_multi_info_read\n");
115         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
116                 CURL_syslog(LOG_DEBUG,
117                             "got curl multi_info message msg=%d\n",
118                             msg->msg);
119
120                 if (CURLMSG_DONE == msg->msg) {
121                         CURL *chnd;
122                         void *chandle = NULL;
123                         CURLcode sta;
124                         CURLMcode msta;
125                         AsyncIO*IO;
126
127                         chandle = NULL;;
128                         chnd = msg->easy_handle;
129                         sta = curl_easy_getinfo(chnd,
130                                                 CURLINFO_PRIVATE,
131                                                 &chandle);
132                         if (sta) {
133                                 syslog(LOG_ERR,
134                                        "error asking curl for private"
135                                        " cookie of curl handle: %s\n",
136                                        curl_easy_strerror(sta));
137                                 continue;
138                         }
139                         IO = (AsyncIO *)chandle;
140                         if (IO->ID == 0) {
141                                 EVCURL_syslog(LOG_ERR,
142                                               "Error, invalid IO context %p\n",
143                                               IO);
144                                 continue;
145                         }
146                         SetEVState(IO, eCurlGotStatus);
147
148                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
149
150                         IO->Now = ev_now(event_base);
151
152                         ev_io_stop(event_base, &IO->recv_event);
153                         ev_io_stop(event_base, &IO->send_event);
154
155                         sta = msg->data.result;
156                         if (sta) {
157                                 EVCURL_syslog(LOG_ERR,
158                                               "error description: %s\n",
159                                               IO->HttpReq.errdesc);
160                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
161                                 EVCURL_syslog(LOG_ERR,
162                                               "error performing request: %s\n",
163                                               IO->HttpReq.CurlError);
164                                 if (sta == CURLE_OPERATION_TIMEDOUT)
165                                 {
166                                         IO->SendBuf.fd = 0;
167                                         IO->RecvBuf.fd = 0;
168                                 }
169                         }
170                         sta = curl_easy_getinfo(chnd,
171                                                 CURLINFO_RESPONSE_CODE,
172                                                 &IO->HttpReq.httpcode);
173                         if (sta)
174                                 EVCURL_syslog(LOG_ERR,
175                                               "error asking curl for "
176                                               "response code from request: %s\n",
177                                               curl_easy_strerror(sta));
178                         EVCURL_syslog(LOG_DEBUG,
179                                       "http response code was %ld\n",
180                                       (long)IO->HttpReq.httpcode);
181
182
183                         curl_slist_free_all(IO->HttpReq.headers);
184                         msta = curl_multi_remove_handle(global.mhnd, chnd);
185                         if (msta)
186                                 EVCURL_syslog(LOG_ERR,
187                                               "warning problem detaching "
188                                               "completed handle from curl multi: "
189                                               "%s\n",
190                                               curl_multi_strerror(msta));
191
192                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
193
194                         IO->HttpReq.attached = 0;
195                         switch(IO->SendDone(IO))
196                         {
197                         case eDBQuery:
198                                 FreeURL(&IO->ConnectMe);
199                                 QueueAnDBOperation(IO);
200                                 break;
201                         case eSendDNSQuery:
202                         case eReadDNSReply:
203                         case eConnect:
204                         case eSendReply:
205                         case eSendMore:
206                         case eSendFile:
207                         case eReadMessage:
208                         case eReadMore:
209                         case eReadPayload:
210                         case eReadFile:
211                                 break;
212                         case eTerminateConnection:
213                         case eAbort:
214                                 curl_easy_cleanup(IO->HttpReq.chnd);
215                                 IO->HttpReq.chnd = NULL;
216                                 FreeStrBuf(&IO->HttpReq.ReplyData);
217                                 FreeURL(&IO->ConnectMe);
218                                 RemoveContext(IO->CitContext);
219                                 IO->Terminate(IO);
220                         }
221                 }
222         }
223 }
224
225 static void
226 stepmulti(void *data, curl_socket_t fd, int which)
227 {
228         int running_handles = 0;
229         CURLMcode msta;
230
231         msta = curl_multi_socket_action(global.mhnd,
232                                         fd,
233                                         which,
234                                         &running_handles);
235
236         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
237         if (msta)
238                 CURL_syslog(LOG_ERR,
239                             "error in curl processing events"
240                             "on multi handle, fd %d: %s\n",
241                             (int)fd,
242                             curl_multi_strerror(msta));
243
244         if (global.nrun != running_handles)
245                 gotstatus(running_handles);
246 }
247
248 static void
249 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
250 {
251         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
252         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
253 }
254
255 static void
256 got_in(struct ev_loop *loop, ev_io *ioev, int events)
257 {
258         CURL_syslog(LOG_DEBUG,
259                     "EVCURL: waking up curl for io on fd %d\n",
260                     (int)ioev->fd);
261
262         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
263 }
264
265 static void
266 got_out(struct ev_loop *loop, ev_io *ioev, int events)
267 {
268         CURL_syslog(LOG_DEBUG,
269                     "waking up curl for io on fd %d\n",
270                     (int)ioev->fd);
271
272         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
273 }
274
275 static size_t
276 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
277 {
278         AsyncIO *IO = (AsyncIO*) cglobal;
279
280         SetEVState(IO, eCurlGotData);
281         if (IO->HttpReq.ReplyData == NULL)
282         {
283                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
284         }
285         IO->Now = ev_now(event_base);
286         return CurlFillStrBuf_callback(data,
287                                        size,
288                                        nmemb,
289                                        IO->HttpReq.ReplyData);
290 }
291
292 static int
293 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
294         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
295         evcurl_global_data *global = cglobal;
296         ev_timer_stop(EV_DEFAULT, &global->timeev);
297         if (tblock_ms < 0 || 14000 < tblock_ms)
298                 tblock_ms = 14000;
299         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
300         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
301         curl_multi_perform(global, &global->nrun);
302         return 0;
303 }
304
305 static int
306 gotwatchsock(CURL *easy,
307              curl_socket_t fd,
308              int action,
309              void *cglobal,
310              void *vIO)
311 {
312         evcurl_global_data *global = cglobal;
313         CURLM *mhnd = global->mhnd;
314         void *f;
315         AsyncIO *IO = (AsyncIO*) vIO;
316         CURLcode sta;
317         const char *Action;
318
319         if (IO == NULL) {
320                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
321                 if (sta) {
322                         CURL_syslog(LOG_ERR,
323                                     "EVCURL: error asking curl for private "
324                                     "cookie of curl handle: %s\n",
325                                     curl_easy_strerror(sta));
326                         return -1;
327                 }
328                 IO = (AsyncIO *) f;
329                 SetEVState(IO, eCurlNewIO);
330                 EVCURL_syslog(LOG_DEBUG,
331                               "EVCURL: got socket for URL: %s\n",
332                               IO->ConnectMe->PlainUrl);
333
334                 if (IO->SendBuf.fd != 0)
335                 {
336                         ev_io_stop(event_base, &IO->recv_event);
337                         ev_io_stop(event_base, &IO->send_event);
338                 }
339                 IO->SendBuf.fd = fd;
340                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
341                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
342                 curl_multi_assign(mhnd, fd, IO);
343         }
344
345         SetEVState(IO, eCurlGotIO);
346         IO->Now = ev_now(event_base);
347
348         Action = "";
349         switch (action)
350         {
351         case CURL_POLL_NONE:
352                 Action = "CURL_POLL_NONE";
353                 break;
354         case CURL_POLL_REMOVE:
355                 Action = "CURL_POLL_REMOVE";
356                 break;
357         case CURL_POLL_IN:
358                 Action = "CURL_POLL_IN";
359                 break;
360         case CURL_POLL_OUT:
361                 Action = "CURL_POLL_OUT";
362                 break;
363         case CURL_POLL_INOUT:
364                 Action = "CURL_POLL_INOUT";
365                 break;
366         }
367
368
369         EVCURL_syslog(LOG_DEBUG,
370                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
371                       (int)fd, Action, action);
372
373         switch (action)
374         {
375         case CURL_POLL_NONE:
376                 EVCURLM_syslog(LOG_DEBUG,
377                                "called first time "
378                                "to register this sockwatcker\n");
379                 break;
380         case CURL_POLL_REMOVE:
381                 EVCURLM_syslog(LOG_DEBUG,
382                                "called last time to unregister "
383                                "this sockwatcher\n");
384                 ev_io_stop(event_base, &IO->recv_event);
385                 ev_io_stop(event_base, &IO->send_event);
386                 break;
387         case CURL_POLL_IN:
388                 ev_io_start(event_base, &IO->recv_event);
389                 ev_io_stop(event_base, &IO->send_event);
390                 break;
391         case CURL_POLL_OUT:
392                 ev_io_start(event_base, &IO->send_event);
393                 ev_io_stop(event_base, &IO->recv_event);
394                 break;
395         case CURL_POLL_INOUT:
396                 ev_io_start(event_base, &IO->send_event);
397                 ev_io_start(event_base, &IO->recv_event);
398                 break;
399         }
400         return 0;
401 }
402
403 void curl_init_connectionpool(void)
404 {
405         CURLM *mhnd ;
406
407         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
408         global.timeev.data = (void *)&global;
409         global.nrun = -1;
410         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
411
412         if (sta)
413         {
414                 CURL_syslog(LOG_ERR,
415                             "error initializing curl library: %s\n",
416                             curl_easy_strerror(sta));
417
418                 exit(1);
419         }
420         mhnd = global.mhnd = curl_multi_init();
421         if (!mhnd)
422         {
423                 CURLM_syslog(LOG_ERR,
424                              "error initializing curl multi handle\n");
425                 exit(3);
426         }
427
428         MOPT(SOCKETFUNCTION, &gotwatchsock);
429         MOPT(SOCKETDATA, (void *)&global);
430         MOPT(TIMERFUNCTION, &gotwatchtime);
431         MOPT(TIMERDATA, (void *)&global);
432
433         return;
434 }
435
436 int evcurl_init(AsyncIO *IO)
437 {
438         CURLcode sta;
439         CURL *chnd;
440
441         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
442         IO->HttpReq.attached = 0;
443         chnd = IO->HttpReq.chnd = curl_easy_init();
444         if (!chnd)
445         {
446                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
447                 return 0;
448         }
449
450 #if DEBUG
451         OPT(VERBOSE, (long)1);
452 #endif
453         OPT(NOPROGRESS, 1L);
454
455         OPT(NOSIGNAL, 1L);
456         OPT(FAILONERROR, (long)1);
457         OPT(ENCODING, "");
458         OPT(FOLLOWLOCATION, (long)0);
459         OPT(MAXREDIRS, (long)0);
460         OPT(USERAGENT, CITADEL);
461
462         OPT(TIMEOUT, (long)1800);
463         OPT(LOW_SPEED_LIMIT, (long)64);
464         OPT(LOW_SPEED_TIME, (long)600);
465         OPT(CONNECTTIMEOUT, (long)600);
466         OPT(PRIVATE, (void *)IO);
467
468         OPT(FORBID_REUSE, 1);
469         OPT(WRITEFUNCTION, &gotdata);
470         OPT(WRITEDATA, (void *)IO);
471         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
472
473         if ((!IsEmptyStr(CtdlGetConfigStr("c_ip_addr")))
474                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "*"))
475                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "::"))
476                 && (strcmp(CtdlGetConfigStr("c_ip_addr"), "0.0.0.0"))
477                 )
478         {
479                 OPT(INTERFACE, CtdlGetConfigStr("c_ip_addr"));
480         }
481
482 #ifdef CURLOPT_HTTP_CONTENT_DECODING
483         OPT(HTTP_CONTENT_DECODING, 1);
484         OPT(ENCODING, "");
485 #endif
486
487         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
488                                                 "Connection: close");
489
490         return 1;
491 }
492
493
494 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
495                                            ev_cleanup *watcher,
496                                            int revents)
497 {
498         CURLMcode msta;
499         AsyncIO *IO = watcher->data;
500
501         if (IO == NULL)
502                 return;
503
504         SetEVState(IO, eCurlShutdown);
505         IO->Now = ev_now(event_base);
506         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
507
508         curl_slist_free_all(IO->HttpReq.headers);
509         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
510         if (msta)
511         {
512                 EVCURL_syslog(LOG_ERR,
513                               "EVCURL: warning problem detaching completed handle "
514                               "from curl multi: %s\n",
515                               curl_multi_strerror(msta));
516         }
517
518         curl_easy_cleanup(IO->HttpReq.chnd);
519         IO->HttpReq.chnd = NULL;
520         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
521         ev_io_stop(event_base, &IO->recv_event);
522         ev_io_stop(event_base, &IO->send_event);
523         assert(IO->ShutdownAbort);
524         IO->ShutdownAbort(IO);
525 }
526 eNextState
527 evcurl_handle_start(AsyncIO *IO)
528 {
529         CURLMcode msta;
530         CURLcode sta;
531         CURL *chnd;
532
533         SetEVState(IO, eCurlStart);
534         chnd = IO->HttpReq.chnd;
535         EVCURL_syslog(LOG_DEBUG,
536                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
537         OPT(URL, IO->ConnectMe->PlainUrl);
538         if (StrLength(IO->ConnectMe->CurlCreds))
539         {
540                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
541                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
542         }
543         if (StrLength(IO->HttpReq.PostData) > 0)
544         {
545                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
546                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
547
548         }
549         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
550                  (IO->HttpReq.PlainPostData != NULL))
551         {
552                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
553                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
554         }
555         OPT(HTTPHEADER, IO->HttpReq.headers);
556
557         IO->NextState = eConnect;
558         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
559         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
560         if (msta)
561         {
562                 EVCURL_syslog(LOG_ERR,
563                           "EVCURL: error attaching to curl multi handle: %s\n",
564                           curl_multi_strerror(msta));
565         }
566
567         IO->HttpReq.attached = 1;
568         ev_async_send (event_base, &WakeupCurl);
569
570         ev_cleanup_init(&IO->abort_by_shutdown,
571                         IOcurl_abort_shutdown_callback);
572
573         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
574
575         return eReadMessage;
576 }
577
578 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
579 {
580         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
581
582         curl_multi_perform(&global, CURL_POLL_NONE);
583 }
584
585 static void evcurl_shutdown (void)
586 {
587         curl_global_cleanup();
588         curl_multi_cleanup(global.mhnd);
589         CURLM_syslog(LOG_DEBUG, "exiting\n");
590 }
591 /*****************************************************************************
592  *                       libevent integration                                *
593  *****************************************************************************/
594 /*
595  * client event queue plus its methods.
596  * this currently is the main loop (which may change in some future?)
597  */
598 int evbase_count = 0;
599 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
600 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
601 HashList *QueueEvents = NULL;
602 HashList *InboundEventQueue = NULL;
603 HashList *InboundEventQueues[2] = { NULL, NULL };
604 extern void ShutDownCLient(AsyncIO *IO);
605
606 ev_async AddJob;
607 ev_async ExitEventLoop;
608
609 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
610 {
611         CitContext *Ctx;
612         long IOID = -1;
613         long count = 0;
614         ev_tstamp Now;
615         HashList *q;
616         void *v;
617         HashPos*It;
618         long len;
619         const char *Key;
620
621         /* get the control command... */
622         pthread_mutex_lock(&EventQueueMutex);
623
624         if (InboundEventQueues[0] == InboundEventQueue) {
625                 InboundEventQueue = InboundEventQueues[1];
626                 q = InboundEventQueues[0];
627         }
628         else {
629                 InboundEventQueue = InboundEventQueues[0];
630                 q = InboundEventQueues[1];
631         }
632         pthread_mutex_unlock(&EventQueueMutex);
633         Now = ev_now (event_base);
634         It = GetNewHashPos(q, 0);
635         while (GetNextHashPos(q, It, &len, &Key, &v))
636         {
637                 IOAddHandler *h = v;
638                 count ++;
639                 if (h->IO->ID == 0) {
640                         h->IO->ID = EvIDSource++;
641                 }
642                 IOID = h->IO->ID;
643                 if (h->IO->StartIO == 0.0)
644                         h->IO->StartIO = Now;
645
646                 SetEVState(h->IO, eIOAttach);
647
648                 Ctx = h->IO->CitContext;
649                 become_session(Ctx);
650
651                 h->IO->Now = Now;
652                 switch (h->EvAttch(h->IO))
653                 {
654                 case eReadMore:
655                 case eReadMessage:
656                 case eReadFile:
657                 case eSendReply:
658                 case eSendMore:
659                 case eReadPayload:
660                 case eSendFile:
661                 case eDBQuery:
662                 case eSendDNSQuery:
663                 case eReadDNSReply:
664                 case eConnect:
665                         break;
666                 case eTerminateConnection:
667                 case eAbort:
668                         ShutDownCLient(h->IO);
669                 break;
670                 }
671         }
672         DeleteHashPos(&It);
673         DeleteHashContent(&q);
674         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
675 }
676
677
678 static void EventExitCallback(EV_P_ ev_async *w, int revents)
679 {
680         ev_break(event_base, EVBREAK_ALL);
681
682         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
683 }
684
685
686
687 void InitEventQueue(void)
688 {
689         pthread_mutex_init(&EventQueueMutex, NULL);
690         pthread_mutex_init(&EventExitQueueMutex, NULL);
691
692         QueueEvents = NewHash(1, Flathash);
693         InboundEventQueues[0] = NewHash(1, Flathash);
694         InboundEventQueues[1] = NewHash(1, Flathash);
695         InboundEventQueue = InboundEventQueues[0];
696 }
697 extern void CtdlDestroyEVCleanupHooks(void);
698
699 extern int EVQShutDown;
700 const char *IOLog = "IO";
701 /*
702  * this thread operates the select() etc. via libev.
703  */
704 void *client_event_thread(void *arg) 
705 {
706         struct CitContext libev_client_CC;
707
708         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
709
710         pthread_setspecific(evConKey, IOLog);
711
712         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
713
714         event_base = ev_default_loop (EVFLAG_AUTO);
715         ev_async_init(&AddJob, QueueEventAddCallback);
716         ev_async_start(event_base, &AddJob);
717         ev_async_init(&ExitEventLoop, EventExitCallback);
718         ev_async_start(event_base, &ExitEventLoop);
719         ev_async_init(&WakeupCurl, WakeupCurlCallback);
720         ev_async_start(event_base, &WakeupCurl);
721
722         curl_init_connectionpool();
723
724         ev_run (event_base, 0);
725
726         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
727
728 ///what todo here?      CtdlClearSystemContext();
729         pthread_mutex_lock(&EventExitQueueMutex);
730         ev_loop_destroy (EV_DEFAULT_UC);
731         event_base = NULL;
732         DeleteHash(&QueueEvents);
733         InboundEventQueue = NULL;
734         DeleteHash(&InboundEventQueues[0]);
735         DeleteHash(&InboundEventQueues[1]);
736 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
737         evcurl_shutdown();
738
739         CtdlDestroyEVCleanupHooks();
740
741         pthread_mutex_unlock(&EventExitQueueMutex);
742         EVQShutDown = 1;
743         return(NULL);
744 }
745
746 /*----------------------------------------------------------------------------*/
747 /*
748  * DB-Queue; does async bdb operations.
749  * has its own set of handlers.
750  */
751 ev_loop *event_db;
752 int evdb_count = 0;
753 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
754 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
755 HashList *DBQueueEvents = NULL;
756 HashList *DBInboundEventQueue = NULL;
757 HashList *DBInboundEventQueues[2] = { NULL, NULL };
758
759 ev_async DBAddJob;
760 ev_async DBExitEventLoop;
761
762 extern void ShutDownDBCLient(AsyncIO *IO);
763
764 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
765 {
766         CitContext *Ctx;
767         long IOID = -1;
768         long count = 0;;
769         ev_tstamp Now;
770         HashList *q;
771         void *v;
772         HashPos *It;
773         long len;
774         const char *Key;
775
776         /* get the control command... */
777         pthread_mutex_lock(&DBEventQueueMutex);
778
779         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
780                 DBInboundEventQueue = DBInboundEventQueues[1];
781                 q = DBInboundEventQueues[0];
782         }
783         else {
784                 DBInboundEventQueue = DBInboundEventQueues[0];
785                 q = DBInboundEventQueues[1];
786         }
787         pthread_mutex_unlock(&DBEventQueueMutex);
788
789         Now = ev_now (event_db);
790         It = GetNewHashPos(q, 0);
791         while (GetNextHashPos(q, It, &len, &Key, &v))
792         {
793                 IOAddHandler *h = v;
794                 eNextState rc;
795                 count ++;
796                 if (h->IO->ID == 0)
797                         h->IO->ID = EvIDSource++;
798                 IOID = h->IO->ID;
799                 if (h->IO->StartDB == 0.0)
800                         h->IO->StartDB = Now;
801                 h->IO->Now = Now;
802
803                 SetEVState(h->IO, eDBAttach);
804                 Ctx = h->IO->CitContext;
805                 become_session(Ctx);
806                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
807                 rc = h->EvAttch(h->IO);
808                 switch (rc)
809                 {
810                 case eAbort:
811                         ShutDownDBCLient(h->IO);
812                 default:
813                         break;
814                 }
815         }
816         DeleteHashPos(&It);
817         DeleteHashContent(&q);
818         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
819 }
820
821
822 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
823 {
824         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
825         ev_break(event_db, EVBREAK_ALL);
826 }
827
828
829
830 void DBInitEventQueue(void)
831 {
832         pthread_mutex_init(&DBEventQueueMutex, NULL);
833         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
834
835         DBQueueEvents = NewHash(1, Flathash);
836         DBInboundEventQueues[0] = NewHash(1, Flathash);
837         DBInboundEventQueues[1] = NewHash(1, Flathash);
838         DBInboundEventQueue = DBInboundEventQueues[0];
839 }
840
841 const char *DBLog = "BD";
842
843 /*
844  * this thread operates writing to the message database via libev.
845  */
846 void *db_event_thread(void *arg)
847 {
848         ev_loop *tmp;
849         struct CitContext libev_msg_CC;
850
851         pthread_setspecific(evConKey, DBLog);
852
853         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
854
855         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
856
857         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
858
859         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
860         ev_async_start(event_db, &DBAddJob);
861         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
862         ev_async_start(event_db, &DBExitEventLoop);
863
864         ev_run (event_db, 0);
865
866         pthread_mutex_lock(&DBEventExitQueueMutex);
867
868         event_db = NULL;
869         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
870
871         DeleteHash(&DBQueueEvents);
872         DBInboundEventQueue = NULL;
873         DeleteHash(&DBInboundEventQueues[0]);
874         DeleteHash(&DBInboundEventQueues[1]);
875
876 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
877
878         ev_loop_destroy (tmp);
879         pthread_mutex_unlock(&DBEventExitQueueMutex);
880         return(NULL);
881 }
882
883 void ShutDownEventQueues(void)
884 {
885         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
886
887         pthread_mutex_lock(&DBEventQueueMutex);
888         ev_async_send (event_db, &DBExitEventLoop);
889         pthread_mutex_unlock(&DBEventQueueMutex);
890
891         pthread_mutex_lock(&EventQueueMutex);
892         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
893         pthread_mutex_unlock(&EventQueueMutex);
894 }
895
896 void DebugEventloopEnable(const int n)
897 {
898         DebugEventLoop = n;
899 }
900 void DebugEventloopBacktraceEnable(const int n)
901 {
902         DebugEventLoopBacktrace = n;
903 }
904
905 void DebugCurlEnable(const int n)
906 {
907         DebugCurl = n;
908 }
909
910 const char *WLog = "WX";
911 CTDL_MODULE_INIT(event_client)
912 {
913         if (!threading)
914         {
915                 if (pthread_key_create(&evConKey, NULL) != 0) {
916                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
917                 }
918                 pthread_setspecific(evConKey, WLog);
919
920                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
921                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
922                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
923                 InitEventQueue();
924                 DBInitEventQueue();
925                 CtdlThreadCreate(client_event_thread);
926                 CtdlThreadCreate(db_event_thread);
927         }
928         return "event";
929 }