39bb108a7af1dce3e03fc2ef65f8fe39e1a5eb70
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugCurl = 0;
65
66 long EvIDSource = 1;
67 /*****************************************************************************
68  *                   libevent / curl integration                             *
69  *****************************************************************************/
70 #define MOPT(s, v)\
71         do {                                                            \
72                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
73                 if (sta) {                                              \
74                         syslog(LOG_ERR, "EVCURL: error setting option " \
75                                #s " on curl multi handle: %s\n",        \
76                                curl_easy_strerror(sta));                \
77                         exit (1);                                       \
78                 }                                                       \
79         } while (0)
80
81 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
82
83 #define EVCURL_syslog(LEVEL, FORMAT, ...) \
84         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID, __VA_ARGS__)
85
86 #define EVCURLM_syslog(LEVEL, FORMAT) \
87         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID)
88
89 #define CURL_syslog(LEVEL, FORMAT, ...) \
90         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
91
92 #define CURLM_syslog(LEVEL, FORMAT) \
93         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
94
95 typedef struct _evcurl_global_data {
96         int magic;
97         CURLM *mhnd;
98         ev_timer timeev;
99         int nrun;
100 } evcurl_global_data;
101
102 ev_async WakeupCurl;
103 evcurl_global_data global;
104
105 static void
106 gotstatus(int nnrun)
107 {
108         CURLMsg *msg;
109         int nmsg;
110
111         global.nrun = nnrun;
112
113         CURLM_syslog(LOG_DEBUG,
114                      "gotstatus(): about to call curl_multi_info_read\n");
115         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
116                 CURL_syslog(LOG_DEBUG,
117                             "got curl multi_info message msg=%d\n",
118                             msg->msg);
119
120                 if (CURLMSG_DONE == msg->msg) {
121                         CURL *chnd;
122                         char *chandle = NULL;
123                         CURLcode sta;
124                         CURLMcode msta;
125                         AsyncIO*IO;
126
127                         chandle = NULL;;
128                         chnd = msg->easy_handle;
129                         sta = curl_easy_getinfo(chnd,
130                                                 CURLINFO_PRIVATE,
131                                                 &chandle);
132                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
133                         if (sta) {
134                                 EVCURL_syslog(LOG_ERR,
135                                               "error asking curl for private"
136                                               " cookie of curl handle: %s\n",
137                                               curl_easy_strerror(sta));
138                                 continue;
139                         }
140                         IO = (AsyncIO *)chandle;
141
142                         IO->Now = ev_now(event_base);
143
144                         ev_io_stop(event_base, &IO->recv_event);
145                         ev_io_stop(event_base, &IO->send_event);
146
147                         sta = msg->data.result;
148                         if (sta) {
149                                 EVCURL_syslog(LOG_ERR,
150                                               "error description: %s\n",
151                                               IO->HttpReq.errdesc);
152                                 EVCURL_syslog(LOG_ERR,
153                                               "error performing request: %s\n",
154                                               curl_easy_strerror(sta));
155                         }
156                         sta = curl_easy_getinfo(chnd,
157                                                 CURLINFO_RESPONSE_CODE,
158                                                 &IO->HttpReq.httpcode);
159                         if (sta)
160                                 EVCURL_syslog(LOG_ERR,
161                                               "error asking curl for "
162                                               "response code from request: %s\n",
163                                               curl_easy_strerror(sta));
164                         EVCURL_syslog(LOG_ERR,
165                                       "http response code was %ld\n",
166                                       (long)IO->HttpReq.httpcode);
167
168
169                         curl_slist_free_all(IO->HttpReq.headers);
170                         msta = curl_multi_remove_handle(global.mhnd, chnd);
171                         if (msta)
172                                 EVCURL_syslog(LOG_ERR,
173                                               "warning problem detaching "
174                                               "completed handle from curl multi: "
175                                               "%s\n",
176                                               curl_multi_strerror(msta));
177
178                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
179
180                         IO->HttpReq.attached = 0;
181                         switch(IO->SendDone(IO))
182                         {
183                         case eDBQuery:
184                                 curl_easy_cleanup(IO->HttpReq.chnd);
185                                 IO->HttpReq.chnd = NULL;
186                                 break;
187                         case eSendDNSQuery:
188                         case eReadDNSReply:
189                         case eConnect:
190                         case eSendReply:
191                         case eSendMore:
192                         case eSendFile:
193                         case eReadMessage:
194                         case eReadMore:
195                         case eReadPayload:
196                         case eReadFile:
197                                 curl_easy_cleanup(IO->HttpReq.chnd);
198                                 IO->HttpReq.chnd = NULL;
199                                 break;
200                         case eTerminateConnection:
201                         case eAbort:
202                                 curl_easy_cleanup(IO->HttpReq.chnd);
203                                 IO->HttpReq.chnd = NULL;
204                                 FreeStrBuf(&IO->HttpReq.ReplyData);
205                                 FreeURL(&IO->ConnectMe);
206                                 RemoveContext(IO->CitContext);
207                                 IO->Terminate(IO);
208                         }
209                 }
210         }
211 }
212
213 static void
214 stepmulti(void *data, curl_socket_t fd, int which)
215 {
216         int running_handles = 0;
217         CURLMcode msta;
218
219         msta = curl_multi_socket_action(global.mhnd,
220                                         fd,
221                                         which,
222                                         &running_handles);
223
224         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
225         if (msta)
226                 CURL_syslog(LOG_ERR,
227                             "error in curl processing events"
228                             "on multi handle, fd %d: %s\n",
229                             (int)fd,
230                             curl_multi_strerror(msta));
231
232         if (global.nrun != running_handles)
233                 gotstatus(running_handles);
234 }
235
236 static void
237 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
238 {
239         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
240         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
241 }
242
243 static void
244 got_in(struct ev_loop *loop, ev_io *ioev, int events)
245 {
246         CURL_syslog(LOG_DEBUG,
247                     "EVCURL: waking up curl for io on fd %d\n",
248                     (int)ioev->fd);
249
250         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
251 }
252
253 static void
254 got_out(struct ev_loop *loop, ev_io *ioev, int events)
255 {
256         CURL_syslog(LOG_DEBUG,
257                     "waking up curl for io on fd %d\n",
258                     (int)ioev->fd);
259
260         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
261 }
262
263 static size_t
264 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
265         AsyncIO *IO = (AsyncIO*) cglobal;
266
267         if (IO->HttpReq.ReplyData == NULL)
268         {
269                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
270         }
271         IO->Now = ev_now(event_base);
272         return CurlFillStrBuf_callback(data,
273                                        size,
274                                        nmemb,
275                                        IO->HttpReq.ReplyData);
276 }
277
278 static int
279 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
280         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
281         evcurl_global_data *global = cglobal;
282         ev_timer_stop(EV_DEFAULT, &global->timeev);
283         if (tblock_ms < 0 || 14000 < tblock_ms)
284                 tblock_ms = 14000;
285         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
286         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
287         curl_multi_perform(global, &global->nrun);
288         return 0;
289 }
290
291 static int
292 gotwatchsock(CURL *easy,
293              curl_socket_t fd,
294              int action,
295              void *cglobal,
296              void *vIO)
297 {
298         evcurl_global_data *global = cglobal;
299         CURLM *mhnd = global->mhnd;
300         char *f;
301         AsyncIO *IO = (AsyncIO*) vIO;
302         CURLcode sta;
303         const char *Action;
304
305         if (IO == NULL) {
306                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
307                 if (sta) {
308                         EVCURL_syslog(LOG_ERR,
309                                       "EVCURL: error asking curl for private "
310                                       "cookie of curl handle: %s\n",
311                                       curl_easy_strerror(sta));
312                         return -1;
313                 }
314                 IO = (AsyncIO *) f;
315                 EVCURL_syslog(LOG_DEBUG,
316                               "EVCURL: got socket for URL: %s\n",
317                               IO->ConnectMe->PlainUrl);
318
319                 if (IO->SendBuf.fd != 0)
320                 {
321                         ev_io_stop(event_base, &IO->recv_event);
322                         ev_io_stop(event_base, &IO->send_event);
323                 }
324                 IO->SendBuf.fd = fd;
325                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
326                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
327                 curl_multi_assign(mhnd, fd, IO);
328         }
329
330         IO->Now = ev_now(event_base);
331
332         Action = "";
333         switch (action)
334         {
335         case CURL_POLL_NONE:
336                 Action = "CURL_POLL_NONE";
337                 break;
338         case CURL_POLL_REMOVE:
339                 Action = "CURL_POLL_REMOVE";
340                 break;
341         case CURL_POLL_IN:
342                 Action = "CURL_POLL_IN";
343                 break;
344         case CURL_POLL_OUT:
345                 Action = "CURL_POLL_OUT";
346                 break;
347         case CURL_POLL_INOUT:
348                 Action = "CURL_POLL_INOUT";
349                 break;
350         }
351
352
353         EVCURL_syslog(LOG_DEBUG,
354                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
355                       (int)fd, Action, action);
356
357         switch (action)
358         {
359         case CURL_POLL_NONE:
360                 EVCURLM_syslog(LOG_ERR,
361                                "called first time "
362                                "to register this sockwatcker\n");
363                 break;
364         case CURL_POLL_REMOVE:
365                 EVCURLM_syslog(LOG_ERR,
366                                "called last time to unregister "
367                                "this sockwatcher\n");
368                 ev_io_stop(event_base, &IO->recv_event);
369                 ev_io_stop(event_base, &IO->send_event);
370                 break;
371         case CURL_POLL_IN:
372                 ev_io_start(event_base, &IO->recv_event);
373                 ev_io_stop(event_base, &IO->send_event);
374                 break;
375         case CURL_POLL_OUT:
376                 ev_io_start(event_base, &IO->send_event);
377                 ev_io_stop(event_base, &IO->recv_event);
378                 break;
379         case CURL_POLL_INOUT:
380                 ev_io_start(event_base, &IO->send_event);
381                 ev_io_start(event_base, &IO->recv_event);
382                 break;
383         }
384         return 0;
385 }
386
387 void curl_init_connectionpool(void)
388 {
389         CURLM *mhnd ;
390
391         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
392         global.timeev.data = (void *)&global;
393         global.nrun = -1;
394         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
395
396         if (sta)
397         {
398                 CURL_syslog(LOG_ERR,
399                             "error initializing curl library: %s\n",
400                             curl_easy_strerror(sta));
401
402                 exit(1);
403         }
404         mhnd = global.mhnd = curl_multi_init();
405         if (!mhnd)
406         {
407                 CURLM_syslog(LOG_ERR,
408                              "error initializing curl multi handle\n");
409                 exit(3);
410         }
411
412         MOPT(SOCKETFUNCTION, &gotwatchsock);
413         MOPT(SOCKETDATA, (void *)&global);
414         MOPT(TIMERFUNCTION, &gotwatchtime);
415         MOPT(TIMERDATA, (void *)&global);
416
417         return;
418 }
419
420 int evcurl_init(AsyncIO *IO)
421 {
422         CURLcode sta;
423         CURL *chnd;
424
425         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
426         IO->HttpReq.attached = 0;
427         chnd = IO->HttpReq.chnd = curl_easy_init();
428         if (!chnd)
429         {
430                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
431                 return 0;
432         }
433
434 #if DEBUG
435         OPT(VERBOSE, (long)1);
436 #endif
437         OPT(NOPROGRESS, 1L);
438
439         OPT(NOSIGNAL, 1L);
440         OPT(FAILONERROR, (long)1);
441         OPT(ENCODING, "");
442         OPT(FOLLOWLOCATION, (long)0);
443         OPT(MAXREDIRS, (long)0);
444         OPT(USERAGENT, CITADEL);
445
446         OPT(TIMEOUT, (long)1800);
447         OPT(LOW_SPEED_LIMIT, (long)64);
448         OPT(LOW_SPEED_TIME, (long)600);
449         OPT(CONNECTTIMEOUT, (long)600);
450         OPT(PRIVATE, (void *)IO);
451
452         OPT(FORBID_REUSE, 1);
453         OPT(WRITEFUNCTION, &gotdata);
454         OPT(WRITEDATA, (void *)IO);
455         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
456
457         if ((!IsEmptyStr(config.c_ip_addr))
458                 && (strcmp(config.c_ip_addr, "*"))
459                 && (strcmp(config.c_ip_addr, "::"))
460                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
461                 )
462         {
463                 OPT(INTERFACE, config.c_ip_addr);
464         }
465
466 #ifdef CURLOPT_HTTP_CONTENT_DECODING
467         OPT(HTTP_CONTENT_DECODING, 1);
468         OPT(ENCODING, "");
469 #endif
470
471         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
472                                                 "Connection: close");
473
474         return 1;
475 }
476
477
478 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
479                                            ev_cleanup *watcher,
480                                            int revents)
481 {
482         CURLMcode msta;
483         AsyncIO *IO = watcher->data;
484         IO->Now = ev_now(event_base);
485         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
486
487         curl_slist_free_all(IO->HttpReq.headers);
488         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
489         if (msta)
490         {
491                 EVCURL_syslog(LOG_ERR,
492                               "EVCURL: warning problem detaching completed handle "
493                               "from curl multi: %s\n",
494                               curl_multi_strerror(msta));
495         }
496
497         curl_easy_cleanup(IO->HttpReq.chnd);
498         IO->HttpReq.chnd = NULL;
499         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
500         ev_io_stop(event_base, &IO->recv_event);
501         ev_io_stop(event_base, &IO->send_event);
502         assert(IO->ShutdownAbort);
503         IO->ShutdownAbort(IO);
504 }
505 eNextState
506 evcurl_handle_start(AsyncIO *IO)
507 {
508         CURLMcode msta;
509         CURLcode sta;
510         CURL *chnd;
511
512         chnd = IO->HttpReq.chnd;
513         EVCURL_syslog(LOG_DEBUG,
514                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
515         OPT(URL, IO->ConnectMe->PlainUrl);
516         if (StrLength(IO->ConnectMe->CurlCreds))
517         {
518                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
519                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
520         }
521         if (StrLength(IO->HttpReq.PostData) > 0)
522         {
523                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
524                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
525
526         }
527         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
528                  (IO->HttpReq.PlainPostData != NULL))
529         {
530                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
531                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
532         }
533         OPT(HTTPHEADER, IO->HttpReq.headers);
534
535         IO->NextState = eConnect;
536         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
537         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
538         if (msta)
539         {
540                 EVCURL_syslog(LOG_ERR,
541                           "EVCURL: error attaching to curl multi handle: %s\n",
542                           curl_multi_strerror(msta));
543         }
544
545         IO->HttpReq.attached = 1;
546         ev_async_send (event_base, &WakeupCurl);
547         ev_cleanup_init(&IO->abort_by_shutdown,
548                         IOcurl_abort_shutdown_callback);
549
550         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
551         return eReadMessage;
552 }
553
554 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
555 {
556         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
557
558         curl_multi_perform(&global, CURL_POLL_NONE);
559 }
560
561 static void evcurl_shutdown (void)
562 {
563         curl_global_cleanup();
564         curl_multi_cleanup(global.mhnd);
565         CURLM_syslog(LOG_DEBUG, "exiting\n");
566 }
567 /*****************************************************************************
568  *                       libevent integration                                *
569  *****************************************************************************/
570 /*
571  * client event queue plus its methods.
572  * this currently is the main loop (which may change in some future?)
573  */
574 int evbase_count = 0;
575 int event_add_pipe[2] = {-1, -1};
576 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
577 HashList *QueueEvents = NULL;
578 HashList *InboundEventQueue = NULL;
579 HashList *InboundEventQueues[2] = { NULL, NULL };
580
581 ev_async AddJob;
582 ev_async ExitEventLoop;
583
584 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
585 {
586         ev_tstamp Now;
587         HashList *q;
588         void *v;
589         HashPos*It;
590         long len;
591         const char *Key;
592
593         /* get the control command... */
594         pthread_mutex_lock(&EventQueueMutex);
595
596         if (InboundEventQueues[0] == InboundEventQueue) {
597                 InboundEventQueue = InboundEventQueues[1];
598                 q = InboundEventQueues[0];
599         }
600         else {
601                 InboundEventQueue = InboundEventQueues[0];
602                 q = InboundEventQueues[1];
603         }
604         pthread_mutex_unlock(&EventQueueMutex);
605         Now = ev_now (event_base);
606         It = GetNewHashPos(q, 0);
607         while (GetNextHashPos(q, It, &len, &Key, &v))
608         {
609                 IOAddHandler *h = v;
610                 if (h->IO->ID == 0) {
611                         h->IO->ID = EvIDSource++;
612                 }
613                 if (h->IO->StartIO == 0.0)
614                         h->IO->StartIO = Now;
615                 h->IO->Now = Now;
616                 h->EvAttch(h->IO);
617         }
618         DeleteHashPos(&It);
619         DeleteHashContent(&q);
620         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
621 }
622
623
624 static void EventExitCallback(EV_P_ ev_async *w, int revents)
625 {
626         ev_break(event_base, EVBREAK_ALL);
627
628         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
629 }
630
631
632
633 void InitEventQueue(void)
634 {
635         struct rlimit LimitSet;
636
637         pthread_mutex_init(&EventQueueMutex, NULL);
638
639         if (pipe(event_add_pipe) != 0) {
640                 syslog(LOG_EMERG,
641                        "Unable to create pipe for libev queueing: %s\n",
642                        strerror(errno));
643                 abort();
644         }
645         LimitSet.rlim_cur = 1;
646         LimitSet.rlim_max = 1;
647         setrlimit(event_add_pipe[1], &LimitSet);
648
649         QueueEvents = NewHash(1, Flathash);
650         InboundEventQueues[0] = NewHash(1, Flathash);
651         InboundEventQueues[1] = NewHash(1, Flathash);
652         InboundEventQueue = InboundEventQueues[0];
653 }
654 extern void CtdlDestroyEVCleanupHooks(void);
655
656 extern int EVQShutDown;
657 /*
658  * this thread operates the select() etc. via libev.
659  */
660 void *client_event_thread(void *arg) 
661 {
662         struct CitContext libev_client_CC;
663
664         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
665 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
666         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
667
668         event_base = ev_default_loop (EVFLAG_AUTO);
669         ev_async_init(&AddJob, QueueEventAddCallback);
670         ev_async_start(event_base, &AddJob);
671         ev_async_init(&ExitEventLoop, EventExitCallback);
672         ev_async_start(event_base, &ExitEventLoop);
673         ev_async_init(&WakeupCurl, WakeupCurlCallback);
674         ev_async_start(event_base, &WakeupCurl);
675
676         curl_init_connectionpool();
677
678         ev_run (event_base, 0);
679
680         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
681
682 ///what todo here?      CtdlClearSystemContext();
683         ev_loop_destroy (EV_DEFAULT_UC);
684         DeleteHash(&QueueEvents);
685         InboundEventQueue = NULL;
686         DeleteHash(&InboundEventQueues[0]);
687         DeleteHash(&InboundEventQueues[1]);
688 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
689         evcurl_shutdown();
690         close(event_add_pipe[0]);
691         close(event_add_pipe[1]);
692
693         CtdlDestroyEVCleanupHooks();
694
695         EVQShutDown = 1;        
696         return(NULL);
697 }
698
699 /*----------------------------------------------------------------------------*/
700 /*
701  * DB-Queue; does async bdb operations.
702  * has its own set of handlers.
703  */
704 ev_loop *event_db;
705 int evdb_count = 0;
706 int evdb_add_pipe[2] = {-1, -1};
707 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
708 HashList *DBQueueEvents = NULL;
709 HashList *DBInboundEventQueue = NULL;
710 HashList *DBInboundEventQueues[2] = { NULL, NULL };
711
712 ev_async DBAddJob;
713 ev_async DBExitEventLoop;
714
715 extern void ShutDownDBCLient(AsyncIO *IO);
716
717 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
718 {
719         ev_tstamp Now;
720         HashList *q;
721         void *v;
722         HashPos *It;
723         long len;
724         const char *Key;
725
726         /* get the control command... */
727         pthread_mutex_lock(&DBEventQueueMutex);
728
729         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
730                 DBInboundEventQueue = DBInboundEventQueues[1];
731                 q = DBInboundEventQueues[0];
732         }
733         else {
734                 DBInboundEventQueue = DBInboundEventQueues[0];
735                 q = DBInboundEventQueues[1];
736         }
737         pthread_mutex_unlock(&DBEventQueueMutex);
738
739         Now = ev_now (event_db);
740         It = GetNewHashPos(q, 0);
741         while (GetNextHashPos(q, It, &len, &Key, &v))
742         {
743                 IOAddHandler *h = v;
744                 eNextState rc;
745                 if (h->IO->ID == 0)
746                         h->IO->ID = EvIDSource++;
747                 if (h->IO->StartDB == 0.0)
748                         h->IO->StartDB = Now;
749                 h->IO->Now = Now;
750                 rc = h->EvAttch(h->IO);
751                 switch (rc)
752                 {
753                 case eAbort:
754                         ShutDownDBCLient(h->IO);
755                 default:
756                         break;
757                 }
758         }
759         DeleteHashPos(&It);
760         DeleteHashContent(&q);
761         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
762 }
763
764
765 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
766 {
767         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
768         ev_break(event_db, EVBREAK_ALL);
769 }
770
771
772
773 void DBInitEventQueue(void)
774 {
775         struct rlimit LimitSet;
776
777         pthread_mutex_init(&DBEventQueueMutex, NULL);
778
779         if (pipe(evdb_add_pipe) != 0) {
780                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
781                 abort();
782         }
783         LimitSet.rlim_cur = 1;
784         LimitSet.rlim_max = 1;
785         setrlimit(evdb_add_pipe[1], &LimitSet);
786
787         DBQueueEvents = NewHash(1, Flathash);
788         DBInboundEventQueues[0] = NewHash(1, Flathash);
789         DBInboundEventQueues[1] = NewHash(1, Flathash);
790         DBInboundEventQueue = DBInboundEventQueues[0];
791 }
792
793 /*
794  * this thread operates writing to the message database via libev.
795  */
796 void *db_event_thread(void *arg)
797 {
798         struct CitContext libev_msg_CC;
799
800         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
801 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
802         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
803
804         event_db = ev_loop_new (EVFLAG_AUTO);
805
806         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
807         ev_async_start(event_db, &DBAddJob);
808         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
809         ev_async_start(event_db, &DBExitEventLoop);
810
811         ev_run (event_db, 0);
812
813         syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
814
815 //// what to do here?   CtdlClearSystemContext();
816         ev_loop_destroy (event_db);
817
818         DeleteHash(&DBQueueEvents);
819         DBInboundEventQueue = NULL;
820         DeleteHash(&DBInboundEventQueues[0]);
821         DeleteHash(&DBInboundEventQueues[1]);
822
823         close(evdb_add_pipe[0]);
824         close(evdb_add_pipe[1]);
825 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
826
827         return(NULL);
828 }
829
830 void ShutDownEventQueues(void)
831 {
832         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
833
834         pthread_mutex_lock(&DBEventQueueMutex);
835         ev_async_send (event_db, &DBExitEventLoop);
836         pthread_mutex_unlock(&DBEventQueueMutex);
837
838         pthread_mutex_lock(&EventQueueMutex);
839         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
840         pthread_mutex_unlock(&EventQueueMutex);
841 }
842
843 void DebugEventloopEnable(void)
844 {
845         DebugEventLoop = 1;
846 }
847
848 void DebugCurlEnable(void)
849 {
850         DebugCurl = 1;
851 }
852
853 CTDL_MODULE_INIT(event_client)
854 {
855         if (!threading)
856         {
857                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable);
858                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable);
859                 InitEventQueue();
860                 DBInitEventQueue();
861                 CtdlThreadCreate(client_event_thread);
862                 CtdlThreadCreate(db_event_thread);
863         }
864         return "event";
865 }