dd3c4d403bb1fc1c8f66d7b4199d2323e38fcd31
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63
64 long EvIDSource = 1;
65 /*****************************************************************************
66  *                   libevent / curl integration                             *
67  *****************************************************************************/
68 #define MOPT(s, v)\
69         do {                                                            \
70                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
71                 if (sta) {                                              \
72                         syslog(LOG_ERR, "EVCURL: error setting option " \
73                                #s " on curl multi handle: %s\n",        \
74                                curl_easy_strerror(sta));                \
75                         exit (1);                                       \
76                 }                                                       \
77         } while (0)
78
79 typedef struct _evcurl_global_data {
80         int magic;
81         CURLM *mhnd;
82         ev_timer timeev;
83         int nrun;
84 } evcurl_global_data;
85
86 ev_async WakeupCurl;
87 evcurl_global_data global;
88
89 static void
90 gotstatus(int nnrun)
91 {
92         CURLMsg *msg;
93         int nmsg;
94
95         global.nrun = nnrun;
96
97         syslog(LOG_DEBUG,
98                "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
99         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
100                 syslog(LOG_ERR,
101                        "EVCURL: got curl multi_info message msg=%d\n",
102                        msg->msg);
103
104                 if (CURLMSG_DONE == msg->msg) {
105                         CURL *chnd;
106                         char *chandle;
107                         CURLcode sta;
108                         CURLMcode msta;
109                         AsyncIO*IO;
110
111                         chandle = NULL;;
112                         chnd = msg->easy_handle;
113                         sta = curl_easy_getinfo(chnd,
114                                                 CURLINFO_PRIVATE,
115                                                 &chandle);
116                         syslog(LOG_ERR, "EVCURL: request complete\n");
117                         if (sta)
118                                 syslog(LOG_ERR,
119                                        "EVCURL: error asking curl for private"
120                                        " cookie of curl handle: %s\n",
121                                        curl_easy_strerror(sta));
122                         IO = (AsyncIO *)chandle;
123
124                         ev_io_stop(event_base, &IO->recv_event);
125                         ev_io_stop(event_base, &IO->send_event);
126
127                         sta = msg->data.result;
128                         if (sta) {
129                                 EV_syslog(LOG_ERR,
130                                           "EVCURL: error description: %s\n",
131                                           IO->HttpReq.errdesc);
132                                 EV_syslog(LOG_ERR,
133                                           "EVCURL: error performing request: %s\n",
134                                           curl_easy_strerror(sta));
135                         }
136                         sta = curl_easy_getinfo(chnd,
137                                                 CURLINFO_RESPONSE_CODE,
138                                                 &IO->HttpReq.httpcode);
139                         if (sta)
140                                 EV_syslog(LOG_ERR,
141                                           "EVCURL: error asking curl for "
142                                           "response code from request: %s\n",
143                                           curl_easy_strerror(sta));
144                         EV_syslog(LOG_ERR,
145                                   "EVCURL: http response code was %ld\n",
146                                   (long)IO->HttpReq.httpcode);
147
148
149                         curl_slist_free_all(IO->HttpReq.headers);
150                         msta = curl_multi_remove_handle(global.mhnd, chnd);
151                         if (msta)
152                                 EV_syslog(LOG_ERR,
153                                           "EVCURL: warning problem detaching "
154                                           "completed handle from curl multi: "
155                                           "%s\n",
156                                           curl_multi_strerror(msta));
157
158                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
159
160                         IO->HttpReq.attached = 0;
161                         switch(IO->SendDone(IO))
162                         {
163                         case eDBQuery:
164                                 curl_easy_cleanup(IO->HttpReq.chnd);
165                                 IO->HttpReq.chnd = NULL;
166                                 break;
167                         case eSendDNSQuery:
168                         case eReadDNSReply:
169                         case eConnect:
170                         case eSendReply:
171                         case eSendMore:
172                         case eSendFile:
173                         case eReadMessage:
174                         case eReadMore:
175                         case eReadPayload:
176                         case eReadFile:
177                                 curl_easy_cleanup(IO->HttpReq.chnd);
178                                 IO->HttpReq.chnd = NULL;
179                                 break;
180                         case eTerminateConnection:
181                         case eAbort:
182                                 curl_easy_cleanup(IO->HttpReq.chnd);
183                                 IO->HttpReq.chnd = NULL;
184                                 FreeStrBuf(&IO->HttpReq.ReplyData);
185                                 FreeURL(&IO->ConnectMe);
186                                 RemoveContext(IO->CitContext);
187                                 IO->Terminate(IO);
188                         }
189                 }
190         }
191 }
192
193 static void
194 stepmulti(void *data, curl_socket_t fd, int which)
195 {
196         int running_handles = 0;
197         CURLMcode msta;
198
199         msta = curl_multi_socket_action(global.mhnd,
200                                         fd,
201                                         which,
202                                         &running_handles);
203
204         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
205         if (msta)
206                 syslog(LOG_ERR,
207                        "EVCURL: error in curl processing events"
208                        "on multi handle, fd %d: %s\n",
209                        (int)fd,
210                        curl_multi_strerror(msta));
211
212         if (global.nrun != running_handles)
213                 gotstatus(running_handles);
214 }
215
216 static void
217 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
218 {
219         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
220         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
221 }
222
223 static void
224 got_in(struct ev_loop *loop, ev_io *ioev, int events)
225 {
226         syslog(LOG_DEBUG,
227                "EVCURL: waking up curl for io on fd %d\n",
228                (int)ioev->fd);
229
230         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
231 }
232
233 static void
234 got_out(struct ev_loop *loop, ev_io *ioev, int events)
235 {
236         syslog(LOG_DEBUG,
237                "EVCURL: waking up curl for io on fd %d\n",
238                (int)ioev->fd);
239
240         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
241 }
242
243 static size_t
244 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
245         AsyncIO *IO = (AsyncIO*) cglobal;
246
247         if (IO->HttpReq.ReplyData == NULL)
248         {
249                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
250         }
251         return CurlFillStrBuf_callback(data,
252                                        size,
253                                        nmemb,
254                                        IO->HttpReq.ReplyData);
255 }
256
257 static int
258 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
259         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
260         evcurl_global_data *global = cglobal;
261         ev_timer_stop(EV_DEFAULT, &global->timeev);
262         if (tblock_ms < 0 || 14000 < tblock_ms)
263                 tblock_ms = 14000;
264         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
265         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
266         curl_multi_perform(global, &global->nrun);
267         return 0;
268 }
269
270 static int
271 gotwatchsock(CURL *easy,
272              curl_socket_t fd,
273              int action,
274              void *cglobal,
275              void *vIO)
276 {
277         evcurl_global_data *global = cglobal;
278         CURLM *mhnd = global->mhnd;
279         char *f;
280         AsyncIO *IO = (AsyncIO*) vIO;
281         CURLcode sta;
282         const char *Action;
283
284         if (IO == NULL) {
285                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
286                 if (sta) {
287                         EV_syslog(LOG_ERR,
288                                   "EVCURL: error asking curl for private "
289                                   "cookie of curl handle: %s\n",
290                                   curl_easy_strerror(sta));
291                         return -1;
292                 }
293                 IO = (AsyncIO *) f;
294                 EV_syslog(LOG_DEBUG,
295                           "EVCURL: got socket for URL: %s\n",
296                           IO->ConnectMe->PlainUrl);
297
298                 if (IO->SendBuf.fd != 0)
299                 {
300                         ev_io_stop(event_base, &IO->recv_event);
301                         ev_io_stop(event_base, &IO->send_event);
302                 }
303                 IO->SendBuf.fd = fd;
304                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
305                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
306                 curl_multi_assign(mhnd, fd, IO);
307         }
308
309         Action = "";
310         switch (action)
311         {
312         case CURL_POLL_NONE:
313                 Action = "CURL_POLL_NONE";
314                 break;
315         case CURL_POLL_REMOVE:
316                 Action = "CURL_POLL_REMOVE";
317                 break;
318         case CURL_POLL_IN:
319                 Action = "CURL_POLL_IN";
320                 break;
321         case CURL_POLL_OUT:
322                 Action = "CURL_POLL_OUT";
323                 break;
324         case CURL_POLL_INOUT:
325                 Action = "CURL_POLL_INOUT";
326                 break;
327         }
328
329
330         EV_syslog(LOG_DEBUG,
331                   "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
332                   (int)fd, Action, action);
333
334         switch (action)
335         {
336         case CURL_POLL_NONE:
337                 EVM_syslog(LOG_ERR,
338                            "EVCURL: called first time "
339                            "to register this sockwatcker\n");
340                 break;
341         case CURL_POLL_REMOVE:
342                 EVM_syslog(LOG_ERR,
343                            "EVCURL: called last time to unregister "
344                            "this sockwatcher\n");
345                 ev_io_stop(event_base, &IO->recv_event);
346                 ev_io_stop(event_base, &IO->send_event);
347                 break;
348         case CURL_POLL_IN:
349                 ev_io_start(event_base, &IO->recv_event);
350                 ev_io_stop(event_base, &IO->send_event);
351                 break;
352         case CURL_POLL_OUT:
353                 ev_io_start(event_base, &IO->send_event);
354                 ev_io_stop(event_base, &IO->recv_event);
355                 break;
356         case CURL_POLL_INOUT:
357                 ev_io_start(event_base, &IO->send_event);
358                 ev_io_start(event_base, &IO->recv_event);
359                 break;
360         }
361         return 0;
362 }
363
364 void curl_init_connectionpool(void)
365 {
366         CURLM *mhnd ;
367
368         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
369         global.timeev.data = (void *)&global;
370         global.nrun = -1;
371         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
372
373         if (sta)
374         {
375                 syslog(LOG_ERR,
376                        "EVCURL: error initializing curl library: %s\n",
377                        curl_easy_strerror(sta));
378
379                 exit(1);
380         }
381         mhnd = global.mhnd = curl_multi_init();
382         if (!mhnd)
383         {
384                 syslog(LOG_ERR,
385                        "EVCURL: error initializing curl multi handle\n");
386                 exit(3);
387         }
388
389         MOPT(SOCKETFUNCTION, &gotwatchsock);
390         MOPT(SOCKETDATA, (void *)&global);
391         MOPT(TIMERFUNCTION, &gotwatchtime);
392         MOPT(TIMERDATA, (void *)&global);
393
394         return;
395 }
396
397 int evcurl_init(AsyncIO *IO)
398 {
399         CURLcode sta;
400         CURL *chnd;
401
402         EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
403         IO->HttpReq.attached = 0;
404         chnd = IO->HttpReq.chnd = curl_easy_init();
405         if (!chnd)
406         {
407                 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
408                 return 0;
409         }
410
411 #if DEBUG
412         OPT(VERBOSE, (long)1);
413 #endif
414         OPT(NOPROGRESS, 1L);
415
416         OPT(NOSIGNAL, 1L);
417         OPT(FAILONERROR, (long)1);
418         OPT(ENCODING, "");
419         OPT(FOLLOWLOCATION, (long)0);
420         OPT(MAXREDIRS, (long)0);
421         OPT(USERAGENT, CITADEL);
422
423         OPT(TIMEOUT, (long)1800);
424         OPT(LOW_SPEED_LIMIT, (long)64);
425         OPT(LOW_SPEED_TIME, (long)600);
426         OPT(CONNECTTIMEOUT, (long)600);
427         OPT(PRIVATE, (void *)IO);
428
429         OPT(FORBID_REUSE, 1);
430         OPT(WRITEFUNCTION, &gotdata);
431         OPT(WRITEDATA, (void *)IO);
432         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
433
434         if ((!IsEmptyStr(config.c_ip_addr))
435                 && (strcmp(config.c_ip_addr, "*"))
436                 && (strcmp(config.c_ip_addr, "::"))
437                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
438                 )
439         {
440                 OPT(INTERFACE, config.c_ip_addr);
441         }
442
443 #ifdef CURLOPT_HTTP_CONTENT_DECODING
444         OPT(HTTP_CONTENT_DECODING, 1);
445         OPT(ENCODING, "");
446 #endif
447
448         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
449                                                 "Connection: close");
450
451         return 1;
452 }
453
454
455 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
456                                            ev_cleanup *watcher,
457                                            int revents)
458 {
459         CURLMcode msta;
460         AsyncIO *IO = watcher->data;
461         EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
462
463         curl_slist_free_all(IO->HttpReq.headers);
464         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
465         if (msta)
466         {
467                 EV_syslog(LOG_ERR,
468                           "EVCURL: warning problem detaching completed handle "
469                           "from curl multi: %s\n",
470                           curl_multi_strerror(msta));
471         }
472
473         curl_easy_cleanup(IO->HttpReq.chnd);
474         IO->HttpReq.chnd = NULL;
475         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
476         ev_io_stop(event_base, &IO->recv_event);
477         ev_io_stop(event_base, &IO->send_event);
478         assert(IO->ShutdownAbort);
479         IO->ShutdownAbort(IO);
480 }
481 eNextState
482 evcurl_handle_start(AsyncIO *IO)
483 {
484         CURLMcode msta;
485         CURLcode sta;
486         CURL *chnd;
487
488         chnd = IO->HttpReq.chnd;
489         EV_syslog(LOG_DEBUG,
490                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
491         OPT(URL, IO->ConnectMe->PlainUrl);
492         if (StrLength(IO->ConnectMe->CurlCreds))
493         {
494                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
495                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
496         }
497         if (StrLength(IO->HttpReq.PostData) > 0)
498         {
499                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
500                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
501
502         }
503         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
504                  (IO->HttpReq.PlainPostData != NULL))
505         {
506                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
507                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
508         }
509         OPT(HTTPHEADER, IO->HttpReq.headers);
510
511         IO->NextState = eConnect;
512         EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
513         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
514         if (msta)
515         {
516                 EV_syslog(LOG_ERR,
517                           "EVCURL: error attaching to curl multi handle: %s\n",
518                           curl_multi_strerror(msta));
519         }
520
521         IO->HttpReq.attached = 1;
522         ev_async_send (event_base, &WakeupCurl);
523         ev_cleanup_init(&IO->abort_by_shutdown,
524                         IOcurl_abort_shutdown_callback);
525
526         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
527         return eReadMessage;
528 }
529
530 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
531 {
532         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
533
534         curl_multi_perform(&global, CURL_POLL_NONE);
535 }
536
537 static void evcurl_shutdown (void)
538 {
539         curl_global_cleanup();
540         curl_multi_cleanup(global.mhnd);
541         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
542 }
543 /*****************************************************************************
544  *                       libevent integration                                *
545  *****************************************************************************/
546 /*
547  * client event queue plus its methods.
548  * this currently is the main loop (which may change in some future?)
549  */
550 int evbase_count = 0;
551 int event_add_pipe[2] = {-1, -1};
552 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
553 HashList *QueueEvents = NULL;
554 HashList *InboundEventQueue = NULL;
555 HashList *InboundEventQueues[2] = { NULL, NULL };
556
557 ev_async AddJob;
558 ev_async ExitEventLoop;
559
560 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
561 {
562         HashList *q;
563         void *v;
564         HashPos*It;
565         long len;
566         const char *Key;
567
568         /* get the control command... */
569         pthread_mutex_lock(&EventQueueMutex);
570
571         if (InboundEventQueues[0] == InboundEventQueue) {
572                 InboundEventQueue = InboundEventQueues[1];
573                 q = InboundEventQueues[0];
574         }
575         else {
576                 InboundEventQueue = InboundEventQueues[0];
577                 q = InboundEventQueues[1];
578         }
579         pthread_mutex_unlock(&EventQueueMutex);
580
581         It = GetNewHashPos(q, 0);
582         while (GetNextHashPos(q, It, &len, &Key, &v))
583         {
584                 IOAddHandler *h = v;
585                 if (h->IO->ID == 0)
586                         h->IO->ID = EvIDSource++;
587                 h->EvAttch(h->IO);
588         }
589         DeleteHashPos(&It);
590         DeleteHashContent(&q);
591         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
592 }
593
594
595 static void EventExitCallback(EV_P_ ev_async *w, int revents)
596 {
597         ev_break(event_base, EVBREAK_ALL);
598
599         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
600 }
601
602
603
604 void InitEventQueue(void)
605 {
606         struct rlimit LimitSet;
607
608         pthread_mutex_init(&EventQueueMutex, NULL);
609
610         if (pipe(event_add_pipe) != 0) {
611                 syslog(LOG_EMERG,
612                        "Unable to create pipe for libev queueing: %s\n",
613                        strerror(errno));
614                 abort();
615         }
616         LimitSet.rlim_cur = 1;
617         LimitSet.rlim_max = 1;
618         setrlimit(event_add_pipe[1], &LimitSet);
619
620         QueueEvents = NewHash(1, Flathash);
621         InboundEventQueues[0] = NewHash(1, Flathash);
622         InboundEventQueues[1] = NewHash(1, Flathash);
623         InboundEventQueue = InboundEventQueues[0];
624 }
625 extern void CtdlDestroyEVCleanupHooks(void);
626
627 extern int EVQShutDown;
628 /*
629  * this thread operates the select() etc. via libev.
630  */
631 void *client_event_thread(void *arg) 
632 {
633         struct CitContext libev_client_CC;
634
635         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
636 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
637         syslog(LOG_DEBUG, "client_event_thread() initializing\n");
638
639         event_base = ev_default_loop (EVFLAG_AUTO);
640         ev_async_init(&AddJob, QueueEventAddCallback);
641         ev_async_start(event_base, &AddJob);
642         ev_async_init(&ExitEventLoop, EventExitCallback);
643         ev_async_start(event_base, &ExitEventLoop);
644         ev_async_init(&WakeupCurl, WakeupCurlCallback);
645         ev_async_start(event_base, &WakeupCurl);
646
647         curl_init_connectionpool();
648
649         ev_run (event_base, 0);
650
651         syslog(LOG_DEBUG, "client_event_thread() exiting\n");
652
653 ///what todo here?      CtdlClearSystemContext();
654         ev_loop_destroy (EV_DEFAULT_UC);
655         DeleteHash(&QueueEvents);
656         InboundEventQueue = NULL;
657         DeleteHash(&InboundEventQueues[0]);
658         DeleteHash(&InboundEventQueues[1]);
659 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
660         evcurl_shutdown();
661         close(event_add_pipe[0]);
662         close(event_add_pipe[1]);
663
664         CtdlDestroyEVCleanupHooks();
665
666         EVQShutDown = 1;        
667         return(NULL);
668 }
669
670 /*----------------------------------------------------------------------------*/
671 /*
672  * DB-Queue; does async bdb operations.
673  * has its own set of handlers.
674  */
675 ev_loop *event_db;
676 int evdb_count = 0;
677 int evdb_add_pipe[2] = {-1, -1};
678 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
679 HashList *DBQueueEvents = NULL;
680 HashList *DBInboundEventQueue = NULL;
681 HashList *DBInboundEventQueues[2] = { NULL, NULL };
682
683 ev_async DBAddJob;
684 ev_async DBExitEventLoop;
685
686 extern void ShutDownDBCLient(AsyncIO *IO);
687
688 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
689 {
690         HashList *q;
691         void *v;
692         HashPos *It;
693         long len;
694         const char *Key;
695
696         /* get the control command... */
697         pthread_mutex_lock(&DBEventQueueMutex);
698
699         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
700                 DBInboundEventQueue = DBInboundEventQueues[1];
701                 q = DBInboundEventQueues[0];
702         }
703         else {
704                 DBInboundEventQueue = DBInboundEventQueues[0];
705                 q = DBInboundEventQueues[1];
706         }
707         pthread_mutex_unlock(&DBEventQueueMutex);
708
709         It = GetNewHashPos(q, 0);
710         while (GetNextHashPos(q, It, &len, &Key, &v))
711         {
712                 IOAddHandler *h = v;
713                 eNextState rc;
714                 if (h->IO->ID == 0)
715                         h->IO->ID = EvIDSource++;
716                 rc = h->EvAttch(h->IO);
717                 switch (rc)
718                 {
719                 case eAbort:
720                         ShutDownDBCLient(h->IO);
721                 default:
722                         break;
723                 }
724         }
725         DeleteHashPos(&It);
726         DeleteHashContent(&q);
727         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
728 }
729
730
731 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
732 {
733         syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
734         ev_break(event_db, EVBREAK_ALL);
735 }
736
737
738
739 void DBInitEventQueue(void)
740 {
741         struct rlimit LimitSet;
742
743         pthread_mutex_init(&DBEventQueueMutex, NULL);
744
745         if (pipe(evdb_add_pipe) != 0) {
746                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
747                 abort();
748         }
749         LimitSet.rlim_cur = 1;
750         LimitSet.rlim_max = 1;
751         setrlimit(evdb_add_pipe[1], &LimitSet);
752
753         DBQueueEvents = NewHash(1, Flathash);
754         DBInboundEventQueues[0] = NewHash(1, Flathash);
755         DBInboundEventQueues[1] = NewHash(1, Flathash);
756         DBInboundEventQueue = DBInboundEventQueues[0];
757 }
758
759 /*
760  * this thread operates writing to the message database via libev.
761  */
762 void *db_event_thread(void *arg)
763 {
764         struct CitContext libev_msg_CC;
765
766         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
767 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
768         syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
769
770         event_db = ev_loop_new (EVFLAG_AUTO);
771
772         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
773         ev_async_start(event_db, &DBAddJob);
774         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
775         ev_async_start(event_db, &DBExitEventLoop);
776
777         ev_run (event_db, 0);
778
779         syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
780
781 //// what to do here?   CtdlClearSystemContext();
782         ev_loop_destroy (event_db);
783
784         DeleteHash(&DBQueueEvents);
785         DBInboundEventQueue = NULL;
786         DeleteHash(&DBInboundEventQueues[0]);
787         DeleteHash(&DBInboundEventQueues[1]);
788
789         close(evdb_add_pipe[0]);
790         close(evdb_add_pipe[1]);
791 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
792
793         return(NULL);
794 }
795
796 void ShutDownEventQueues(void)
797 {
798         syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
799
800         pthread_mutex_lock(&DBEventQueueMutex);
801         ev_async_send (event_db, &DBExitEventLoop);
802         pthread_mutex_unlock(&DBEventQueueMutex);
803
804         pthread_mutex_lock(&EventQueueMutex);
805         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
806         pthread_mutex_unlock(&EventQueueMutex);
807 }
808
809 CTDL_MODULE_INIT(event_client)
810 {
811         if (!threading)
812         {
813                 InitEventQueue();
814                 DBInitEventQueue();
815                 CtdlThreadCreate(client_event_thread);
816                 CtdlThreadCreate(db_event_thread);
817         }
818         return "event";
819 }