97e217de9dd43c6ca3b9e171a47970b36dacaa5a
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugCurl = 0;
65
66 long EvIDSource = 1;
67 /*****************************************************************************
68  *                   libevent / curl integration                             *
69  *****************************************************************************/
70 #define MOPT(s, v)\
71         do {                                                            \
72                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
73                 if (sta) {                                              \
74                         syslog(LOG_ERR, "EVCURL: error setting option " \
75                                #s " on curl multi handle: %s\n",        \
76                                curl_easy_strerror(sta));                \
77                         exit (1);                                       \
78                 }                                                       \
79         } while (0)
80
81 typedef struct _evcurl_global_data {
82         int magic;
83         CURLM *mhnd;
84         ev_timer timeev;
85         int nrun;
86 } evcurl_global_data;
87
88 ev_async WakeupCurl;
89 evcurl_global_data global;
90
91 static void
92 gotstatus(int nnrun)
93 {
94         CURLMsg *msg;
95         int nmsg;
96
97         global.nrun = nnrun;
98
99         syslog(LOG_DEBUG,
100                "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
101         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
102                 syslog(LOG_ERR,
103                        "EVCURL: got curl multi_info message msg=%d\n",
104                        msg->msg);
105
106                 if (CURLMSG_DONE == msg->msg) {
107                         CURL *chnd;
108                         char *chandle;
109                         CURLcode sta;
110                         CURLMcode msta;
111                         AsyncIO*IO;
112
113                         chandle = NULL;;
114                         chnd = msg->easy_handle;
115                         sta = curl_easy_getinfo(chnd,
116                                                 CURLINFO_PRIVATE,
117                                                 &chandle);
118                         syslog(LOG_ERR, "EVCURL: request complete\n");
119                         if (sta)
120                                 syslog(LOG_ERR,
121                                        "EVCURL: error asking curl for private"
122                                        " cookie of curl handle: %s\n",
123                                        curl_easy_strerror(sta));
124                         IO = (AsyncIO *)chandle;
125
126                         IO->Now = ev_now(event_base);
127
128                         ev_io_stop(event_base, &IO->recv_event);
129                         ev_io_stop(event_base, &IO->send_event);
130
131                         sta = msg->data.result;
132                         if (sta) {
133                                 EV_syslog(LOG_ERR,
134                                           "EVCURL: error description: %s\n",
135                                           IO->HttpReq.errdesc);
136                                 EV_syslog(LOG_ERR,
137                                           "EVCURL: error performing request: %s\n",
138                                           curl_easy_strerror(sta));
139                         }
140                         sta = curl_easy_getinfo(chnd,
141                                                 CURLINFO_RESPONSE_CODE,
142                                                 &IO->HttpReq.httpcode);
143                         if (sta)
144                                 EV_syslog(LOG_ERR,
145                                           "EVCURL: error asking curl for "
146                                           "response code from request: %s\n",
147                                           curl_easy_strerror(sta));
148                         EV_syslog(LOG_ERR,
149                                   "EVCURL: http response code was %ld\n",
150                                   (long)IO->HttpReq.httpcode);
151
152
153                         curl_slist_free_all(IO->HttpReq.headers);
154                         msta = curl_multi_remove_handle(global.mhnd, chnd);
155                         if (msta)
156                                 EV_syslog(LOG_ERR,
157                                           "EVCURL: warning problem detaching "
158                                           "completed handle from curl multi: "
159                                           "%s\n",
160                                           curl_multi_strerror(msta));
161
162                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
163
164                         IO->HttpReq.attached = 0;
165                         switch(IO->SendDone(IO))
166                         {
167                         case eDBQuery:
168                                 curl_easy_cleanup(IO->HttpReq.chnd);
169                                 IO->HttpReq.chnd = NULL;
170                                 break;
171                         case eSendDNSQuery:
172                         case eReadDNSReply:
173                         case eConnect:
174                         case eSendReply:
175                         case eSendMore:
176                         case eSendFile:
177                         case eReadMessage:
178                         case eReadMore:
179                         case eReadPayload:
180                         case eReadFile:
181                                 curl_easy_cleanup(IO->HttpReq.chnd);
182                                 IO->HttpReq.chnd = NULL;
183                                 break;
184                         case eTerminateConnection:
185                         case eAbort:
186                                 curl_easy_cleanup(IO->HttpReq.chnd);
187                                 IO->HttpReq.chnd = NULL;
188                                 FreeStrBuf(&IO->HttpReq.ReplyData);
189                                 FreeURL(&IO->ConnectMe);
190                                 RemoveContext(IO->CitContext);
191                                 IO->Terminate(IO);
192                         }
193                 }
194         }
195 }
196
197 static void
198 stepmulti(void *data, curl_socket_t fd, int which)
199 {
200         int running_handles = 0;
201         CURLMcode msta;
202
203         msta = curl_multi_socket_action(global.mhnd,
204                                         fd,
205                                         which,
206                                         &running_handles);
207
208         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
209         if (msta)
210                 syslog(LOG_ERR,
211                        "EVCURL: error in curl processing events"
212                        "on multi handle, fd %d: %s\n",
213                        (int)fd,
214                        curl_multi_strerror(msta));
215
216         if (global.nrun != running_handles)
217                 gotstatus(running_handles);
218 }
219
220 static void
221 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
222 {
223         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
224         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
225 }
226
227 static void
228 got_in(struct ev_loop *loop, ev_io *ioev, int events)
229 {
230         syslog(LOG_DEBUG,
231                "EVCURL: waking up curl for io on fd %d\n",
232                (int)ioev->fd);
233
234         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
235 }
236
237 static void
238 got_out(struct ev_loop *loop, ev_io *ioev, int events)
239 {
240         syslog(LOG_DEBUG,
241                "EVCURL: waking up curl for io on fd %d\n",
242                (int)ioev->fd);
243
244         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
245 }
246
247 static size_t
248 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
249         AsyncIO *IO = (AsyncIO*) cglobal;
250
251         if (IO->HttpReq.ReplyData == NULL)
252         {
253                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
254         }
255         IO->Now = ev_now(event_base);
256         return CurlFillStrBuf_callback(data,
257                                        size,
258                                        nmemb,
259                                        IO->HttpReq.ReplyData);
260 }
261
262 static int
263 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
264         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
265         evcurl_global_data *global = cglobal;
266         ev_timer_stop(EV_DEFAULT, &global->timeev);
267         if (tblock_ms < 0 || 14000 < tblock_ms)
268                 tblock_ms = 14000;
269         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
270         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
271         curl_multi_perform(global, &global->nrun);
272         return 0;
273 }
274
275 static int
276 gotwatchsock(CURL *easy,
277              curl_socket_t fd,
278              int action,
279              void *cglobal,
280              void *vIO)
281 {
282         evcurl_global_data *global = cglobal;
283         CURLM *mhnd = global->mhnd;
284         char *f;
285         AsyncIO *IO = (AsyncIO*) vIO;
286         CURLcode sta;
287         const char *Action;
288
289         if (IO == NULL) {
290                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
291                 if (sta) {
292                         EV_syslog(LOG_ERR,
293                                   "EVCURL: error asking curl for private "
294                                   "cookie of curl handle: %s\n",
295                                   curl_easy_strerror(sta));
296                         return -1;
297                 }
298                 IO = (AsyncIO *) f;
299                 EV_syslog(LOG_DEBUG,
300                           "EVCURL: got socket for URL: %s\n",
301                           IO->ConnectMe->PlainUrl);
302
303                 if (IO->SendBuf.fd != 0)
304                 {
305                         ev_io_stop(event_base, &IO->recv_event);
306                         ev_io_stop(event_base, &IO->send_event);
307                 }
308                 IO->SendBuf.fd = fd;
309                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
310                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
311                 curl_multi_assign(mhnd, fd, IO);
312         }
313
314         IO->Now = ev_now(event_base);
315
316         Action = "";
317         switch (action)
318         {
319         case CURL_POLL_NONE:
320                 Action = "CURL_POLL_NONE";
321                 break;
322         case CURL_POLL_REMOVE:
323                 Action = "CURL_POLL_REMOVE";
324                 break;
325         case CURL_POLL_IN:
326                 Action = "CURL_POLL_IN";
327                 break;
328         case CURL_POLL_OUT:
329                 Action = "CURL_POLL_OUT";
330                 break;
331         case CURL_POLL_INOUT:
332                 Action = "CURL_POLL_INOUT";
333                 break;
334         }
335
336
337         EV_syslog(LOG_DEBUG,
338                   "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
339                   (int)fd, Action, action);
340
341         switch (action)
342         {
343         case CURL_POLL_NONE:
344                 EVM_syslog(LOG_ERR,
345                            "EVCURL: called first time "
346                            "to register this sockwatcker\n");
347                 break;
348         case CURL_POLL_REMOVE:
349                 EVM_syslog(LOG_ERR,
350                            "EVCURL: called last time to unregister "
351                            "this sockwatcher\n");
352                 ev_io_stop(event_base, &IO->recv_event);
353                 ev_io_stop(event_base, &IO->send_event);
354                 break;
355         case CURL_POLL_IN:
356                 ev_io_start(event_base, &IO->recv_event);
357                 ev_io_stop(event_base, &IO->send_event);
358                 break;
359         case CURL_POLL_OUT:
360                 ev_io_start(event_base, &IO->send_event);
361                 ev_io_stop(event_base, &IO->recv_event);
362                 break;
363         case CURL_POLL_INOUT:
364                 ev_io_start(event_base, &IO->send_event);
365                 ev_io_start(event_base, &IO->recv_event);
366                 break;
367         }
368         return 0;
369 }
370
371 void curl_init_connectionpool(void)
372 {
373         CURLM *mhnd ;
374
375         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
376         global.timeev.data = (void *)&global;
377         global.nrun = -1;
378         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
379
380         if (sta)
381         {
382                 syslog(LOG_ERR,
383                        "EVCURL: error initializing curl library: %s\n",
384                        curl_easy_strerror(sta));
385
386                 exit(1);
387         }
388         mhnd = global.mhnd = curl_multi_init();
389         if (!mhnd)
390         {
391                 syslog(LOG_ERR,
392                        "EVCURL: error initializing curl multi handle\n");
393                 exit(3);
394         }
395
396         MOPT(SOCKETFUNCTION, &gotwatchsock);
397         MOPT(SOCKETDATA, (void *)&global);
398         MOPT(TIMERFUNCTION, &gotwatchtime);
399         MOPT(TIMERDATA, (void *)&global);
400
401         return;
402 }
403
404 int evcurl_init(AsyncIO *IO)
405 {
406         CURLcode sta;
407         CURL *chnd;
408
409         EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
410         IO->HttpReq.attached = 0;
411         chnd = IO->HttpReq.chnd = curl_easy_init();
412         if (!chnd)
413         {
414                 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
415                 return 0;
416         }
417
418 #if DEBUG
419         OPT(VERBOSE, (long)1);
420 #endif
421         OPT(NOPROGRESS, 1L);
422
423         OPT(NOSIGNAL, 1L);
424         OPT(FAILONERROR, (long)1);
425         OPT(ENCODING, "");
426         OPT(FOLLOWLOCATION, (long)0);
427         OPT(MAXREDIRS, (long)0);
428         OPT(USERAGENT, CITADEL);
429
430         OPT(TIMEOUT, (long)1800);
431         OPT(LOW_SPEED_LIMIT, (long)64);
432         OPT(LOW_SPEED_TIME, (long)600);
433         OPT(CONNECTTIMEOUT, (long)600);
434         OPT(PRIVATE, (void *)IO);
435
436         OPT(FORBID_REUSE, 1);
437         OPT(WRITEFUNCTION, &gotdata);
438         OPT(WRITEDATA, (void *)IO);
439         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
440
441         if ((!IsEmptyStr(config.c_ip_addr))
442                 && (strcmp(config.c_ip_addr, "*"))
443                 && (strcmp(config.c_ip_addr, "::"))
444                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
445                 )
446         {
447                 OPT(INTERFACE, config.c_ip_addr);
448         }
449
450 #ifdef CURLOPT_HTTP_CONTENT_DECODING
451         OPT(HTTP_CONTENT_DECODING, 1);
452         OPT(ENCODING, "");
453 #endif
454
455         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
456                                                 "Connection: close");
457
458         return 1;
459 }
460
461
462 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
463                                            ev_cleanup *watcher,
464                                            int revents)
465 {
466         CURLMcode msta;
467         AsyncIO *IO = watcher->data;
468         IO->Now = ev_now(event_base);
469         EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
470
471         curl_slist_free_all(IO->HttpReq.headers);
472         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
473         if (msta)
474         {
475                 EV_syslog(LOG_ERR,
476                           "EVCURL: warning problem detaching completed handle "
477                           "from curl multi: %s\n",
478                           curl_multi_strerror(msta));
479         }
480
481         curl_easy_cleanup(IO->HttpReq.chnd);
482         IO->HttpReq.chnd = NULL;
483         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
484         ev_io_stop(event_base, &IO->recv_event);
485         ev_io_stop(event_base, &IO->send_event);
486         assert(IO->ShutdownAbort);
487         IO->ShutdownAbort(IO);
488 }
489 eNextState
490 evcurl_handle_start(AsyncIO *IO)
491 {
492         CURLMcode msta;
493         CURLcode sta;
494         CURL *chnd;
495
496         chnd = IO->HttpReq.chnd;
497         EV_syslog(LOG_DEBUG,
498                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
499         OPT(URL, IO->ConnectMe->PlainUrl);
500         if (StrLength(IO->ConnectMe->CurlCreds))
501         {
502                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
503                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
504         }
505         if (StrLength(IO->HttpReq.PostData) > 0)
506         {
507                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
508                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
509
510         }
511         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
512                  (IO->HttpReq.PlainPostData != NULL))
513         {
514                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
515                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
516         }
517         OPT(HTTPHEADER, IO->HttpReq.headers);
518
519         IO->NextState = eConnect;
520         EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
521         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
522         if (msta)
523         {
524                 EV_syslog(LOG_ERR,
525                           "EVCURL: error attaching to curl multi handle: %s\n",
526                           curl_multi_strerror(msta));
527         }
528
529         IO->HttpReq.attached = 1;
530         ev_async_send (event_base, &WakeupCurl);
531         ev_cleanup_init(&IO->abort_by_shutdown,
532                         IOcurl_abort_shutdown_callback);
533
534         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
535         return eReadMessage;
536 }
537
538 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
539 {
540         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
541
542         curl_multi_perform(&global, CURL_POLL_NONE);
543 }
544
545 static void evcurl_shutdown (void)
546 {
547         curl_global_cleanup();
548         curl_multi_cleanup(global.mhnd);
549         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
550 }
551 /*****************************************************************************
552  *                       libevent integration                                *
553  *****************************************************************************/
554 /*
555  * client event queue plus its methods.
556  * this currently is the main loop (which may change in some future?)
557  */
558 int evbase_count = 0;
559 int event_add_pipe[2] = {-1, -1};
560 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
561 HashList *QueueEvents = NULL;
562 HashList *InboundEventQueue = NULL;
563 HashList *InboundEventQueues[2] = { NULL, NULL };
564
565 ev_async AddJob;
566 ev_async ExitEventLoop;
567
568 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
569 {
570         ev_tstamp Now;
571         HashList *q;
572         void *v;
573         HashPos*It;
574         long len;
575         const char *Key;
576
577         /* get the control command... */
578         pthread_mutex_lock(&EventQueueMutex);
579
580         if (InboundEventQueues[0] == InboundEventQueue) {
581                 InboundEventQueue = InboundEventQueues[1];
582                 q = InboundEventQueues[0];
583         }
584         else {
585                 InboundEventQueue = InboundEventQueues[0];
586                 q = InboundEventQueues[1];
587         }
588         pthread_mutex_unlock(&EventQueueMutex);
589         Now = ev_now (event_base);
590         It = GetNewHashPos(q, 0);
591         while (GetNextHashPos(q, It, &len, &Key, &v))
592         {
593                 IOAddHandler *h = v;
594                 if (h->IO->ID == 0) {
595                         h->IO->ID = EvIDSource++;
596                 }
597                 if (h->IO->StartIO == 0.0)
598                         h->IO->StartIO = Now;
599                 h->IO->Now = Now;
600                 h->EvAttch(h->IO);
601         }
602         DeleteHashPos(&It);
603         DeleteHashContent(&q);
604         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
605 }
606
607
608 static void EventExitCallback(EV_P_ ev_async *w, int revents)
609 {
610         ev_break(event_base, EVBREAK_ALL);
611
612         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
613 }
614
615
616
617 void InitEventQueue(void)
618 {
619         struct rlimit LimitSet;
620
621         pthread_mutex_init(&EventQueueMutex, NULL);
622
623         if (pipe(event_add_pipe) != 0) {
624                 syslog(LOG_EMERG,
625                        "Unable to create pipe for libev queueing: %s\n",
626                        strerror(errno));
627                 abort();
628         }
629         LimitSet.rlim_cur = 1;
630         LimitSet.rlim_max = 1;
631         setrlimit(event_add_pipe[1], &LimitSet);
632
633         QueueEvents = NewHash(1, Flathash);
634         InboundEventQueues[0] = NewHash(1, Flathash);
635         InboundEventQueues[1] = NewHash(1, Flathash);
636         InboundEventQueue = InboundEventQueues[0];
637 }
638 extern void CtdlDestroyEVCleanupHooks(void);
639
640 extern int EVQShutDown;
641 /*
642  * this thread operates the select() etc. via libev.
643  */
644 void *client_event_thread(void *arg) 
645 {
646         struct CitContext libev_client_CC;
647
648         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
649 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
650         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
651
652         event_base = ev_default_loop (EVFLAG_AUTO);
653         ev_async_init(&AddJob, QueueEventAddCallback);
654         ev_async_start(event_base, &AddJob);
655         ev_async_init(&ExitEventLoop, EventExitCallback);
656         ev_async_start(event_base, &ExitEventLoop);
657         ev_async_init(&WakeupCurl, WakeupCurlCallback);
658         ev_async_start(event_base, &WakeupCurl);
659
660         curl_init_connectionpool();
661
662         ev_run (event_base, 0);
663
664         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
665
666 ///what todo here?      CtdlClearSystemContext();
667         ev_loop_destroy (EV_DEFAULT_UC);
668         DeleteHash(&QueueEvents);
669         InboundEventQueue = NULL;
670         DeleteHash(&InboundEventQueues[0]);
671         DeleteHash(&InboundEventQueues[1]);
672 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
673         evcurl_shutdown();
674         close(event_add_pipe[0]);
675         close(event_add_pipe[1]);
676
677         CtdlDestroyEVCleanupHooks();
678
679         EVQShutDown = 1;        
680         return(NULL);
681 }
682
683 /*----------------------------------------------------------------------------*/
684 /*
685  * DB-Queue; does async bdb operations.
686  * has its own set of handlers.
687  */
688 ev_loop *event_db;
689 int evdb_count = 0;
690 int evdb_add_pipe[2] = {-1, -1};
691 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
692 HashList *DBQueueEvents = NULL;
693 HashList *DBInboundEventQueue = NULL;
694 HashList *DBInboundEventQueues[2] = { NULL, NULL };
695
696 ev_async DBAddJob;
697 ev_async DBExitEventLoop;
698
699 extern void ShutDownDBCLient(AsyncIO *IO);
700
701 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
702 {
703         ev_tstamp Now;
704         HashList *q;
705         void *v;
706         HashPos *It;
707         long len;
708         const char *Key;
709
710         /* get the control command... */
711         pthread_mutex_lock(&DBEventQueueMutex);
712
713         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
714                 DBInboundEventQueue = DBInboundEventQueues[1];
715                 q = DBInboundEventQueues[0];
716         }
717         else {
718                 DBInboundEventQueue = DBInboundEventQueues[0];
719                 q = DBInboundEventQueues[1];
720         }
721         pthread_mutex_unlock(&DBEventQueueMutex);
722
723         Now = ev_now (event_db);
724         It = GetNewHashPos(q, 0);
725         while (GetNextHashPos(q, It, &len, &Key, &v))
726         {
727                 IOAddHandler *h = v;
728                 eNextState rc;
729                 if (h->IO->ID == 0)
730                         h->IO->ID = EvIDSource++;
731                 if (h->IO->StartDB == 0.0)
732                         h->IO->StartDB = Now;
733                 h->IO->Now = Now;
734                 rc = h->EvAttch(h->IO);
735                 switch (rc)
736                 {
737                 case eAbort:
738                         ShutDownDBCLient(h->IO);
739                 default:
740                         break;
741                 }
742         }
743         DeleteHashPos(&It);
744         DeleteHashContent(&q);
745         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
746 }
747
748
749 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
750 {
751         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
752         ev_break(event_db, EVBREAK_ALL);
753 }
754
755
756
757 void DBInitEventQueue(void)
758 {
759         struct rlimit LimitSet;
760
761         pthread_mutex_init(&DBEventQueueMutex, NULL);
762
763         if (pipe(evdb_add_pipe) != 0) {
764                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
765                 abort();
766         }
767         LimitSet.rlim_cur = 1;
768         LimitSet.rlim_max = 1;
769         setrlimit(evdb_add_pipe[1], &LimitSet);
770
771         DBQueueEvents = NewHash(1, Flathash);
772         DBInboundEventQueues[0] = NewHash(1, Flathash);
773         DBInboundEventQueues[1] = NewHash(1, Flathash);
774         DBInboundEventQueue = DBInboundEventQueues[0];
775 }
776
777 /*
778  * this thread operates writing to the message database via libev.
779  */
780 void *db_event_thread(void *arg)
781 {
782         struct CitContext libev_msg_CC;
783
784         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
785 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
786         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
787
788         event_db = ev_loop_new (EVFLAG_AUTO);
789
790         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
791         ev_async_start(event_db, &DBAddJob);
792         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
793         ev_async_start(event_db, &DBExitEventLoop);
794
795         ev_run (event_db, 0);
796
797         syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
798
799 //// what to do here?   CtdlClearSystemContext();
800         ev_loop_destroy (event_db);
801
802         DeleteHash(&DBQueueEvents);
803         DBInboundEventQueue = NULL;
804         DeleteHash(&DBInboundEventQueues[0]);
805         DeleteHash(&DBInboundEventQueues[1]);
806
807         close(evdb_add_pipe[0]);
808         close(evdb_add_pipe[1]);
809 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
810
811         return(NULL);
812 }
813
814 void ShutDownEventQueues(void)
815 {
816         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
817
818         pthread_mutex_lock(&DBEventQueueMutex);
819         ev_async_send (event_db, &DBExitEventLoop);
820         pthread_mutex_unlock(&DBEventQueueMutex);
821
822         pthread_mutex_lock(&EventQueueMutex);
823         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
824         pthread_mutex_unlock(&EventQueueMutex);
825 }
826
827 void DebugEventloopEnable(void)
828 {
829         DebugEventLoop = 1;
830 }
831
832 void DebugCurlEnable(void)
833 {
834         DebugCurl = 1;
835 }
836
837 CTDL_MODULE_INIT(event_client)
838 {
839         if (!threading)
840         {
841                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable);
842                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable);
843                 InitEventQueue();
844                 DBInitEventQueue();
845                 CtdlThreadCreate(client_event_thread);
846                 CtdlThreadCreate(db_event_thread);
847         }
848         return "event";
849 }