aa42fe75478f2d9df080c51c60c9a5c9d03b83d1
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #include "event_client.h"
59 #include "serv_curl.h"
60
61 ev_loop *event_base;
62
63 /*****************************************************************************
64  *                   libevent / curl integration                             *
65  *****************************************************************************/
66 #define MOPT(s, v)                                                      \
67         do {                                                            \
68                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
69                 if (sta) {                                              \
70                         syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
71                         exit (1);                                       \
72                 }                                                       \
73         } while (0)
74
75 typedef struct _evcurl_global_data {
76         int magic;
77         CURLM *mhnd;
78         ev_timer timeev;
79         int nrun;
80 } evcurl_global_data;
81
82 ev_async WakeupCurl;
83 evcurl_global_data global;
84
85 static void
86 gotstatus(int nnrun) 
87 {
88         CURLM *mhnd;
89         CURLMsg *msg;
90         int nmsg;
91
92         global.nrun = nnrun;
93         mhnd = global.mhnd;
94
95         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
96         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
97                 syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
98                 if (CURLMSG_DONE == msg->msg) {
99                         CURL *chnd;
100                         char *chandle;
101                         CURLcode sta;
102                         CURLMcode msta;
103                         AsyncIO  *IO;
104
105                         chandle = NULL;;
106                         chnd = msg->easy_handle;
107                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
108                         syslog(LOG_ERR, "EVCURL: request complete\n");
109                         if (sta)
110                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
111                         IO = (AsyncIO *)chandle;
112                         
113                         ev_io_stop(event_base, &IO->recv_event);
114                         ev_io_stop(event_base, &IO->send_event);
115
116                         sta = msg->data.result;
117                         if (sta) {
118                                 syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
119                                 syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
120                         }
121                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
122                         if (sta)
123                                 syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
124                         syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
125
126
127                         curl_slist_free_all(IO->HttpReq.headers);
128                         msta = curl_multi_remove_handle(mhnd, chnd);
129                         if (msta)
130                                 syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
131
132                         curl_easy_cleanup(IO->HttpReq.chnd);
133                         IO->HttpReq.chnd = NULL;
134
135                         IO->HttpReq.attached = 0;
136                         IO->SendDone(IO);
137
138                         FreeStrBuf(&IO->HttpReq.ReplyData);
139                         FreeURL(&IO->ConnectMe);
140                         RemoveContext(IO->CitContext);
141                         IO->Terminate(IO);
142                 }
143         }
144 }
145
146 static void
147 stepmulti(void *data, curl_socket_t fd, int which)
148 {
149         int running_handles = 0;
150         CURLMcode msta;
151         
152         msta = curl_multi_socket_action(global.mhnd, fd, which, &running_handles);
153         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
154         if (msta)
155                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
156         if (global.nrun != running_handles)
157                 gotstatus(running_handles);
158 }
159
160 static void
161 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
162         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
163         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
164 }
165
166 static void
167 got_in(struct ev_loop *loop, ev_io *ioev, int events) {
168         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
169         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
170 }
171
172 static void
173 got_out(struct ev_loop *loop, ev_io *ioev, int events) {
174         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
175         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
176 }
177
178 static size_t
179 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
180         AsyncIO *IO = (AsyncIO*) cglobal;
181
182         if (IO->HttpReq.ReplyData == NULL)
183         {
184             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
185         }
186         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
187 }
188
189 static int
190 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
191         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
192         evcurl_global_data *global = cglobal;
193         ev_timer_stop(EV_DEFAULT, &global->timeev);
194         if (tblock_ms < 0 || 14000 < tblock_ms)
195                 tblock_ms = 14000;
196         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
197         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
198         curl_multi_perform(global, CURL_POLL_NONE);
199         return 0;
200 }
201
202 static int
203 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) {
204         evcurl_global_data *global = cglobal;
205         CURLM *mhnd = global->mhnd;
206         char *f;
207         AsyncIO *IO = (AsyncIO*) vIO;
208         CURLcode sta;
209
210         syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
211         if (IO == NULL) {
212                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
213                 if (sta) {
214                         syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
215                         return -1;
216                 }
217                 IO = (AsyncIO *) f;
218                 if (IO->SendBuf.fd != 0)
219                 {
220                         ev_io_stop(event_base, &IO->recv_event);
221                         ev_io_stop(event_base, &IO->send_event);
222                 }
223                 IO->SendBuf.fd = fd;
224                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
225                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
226                 curl_multi_assign(mhnd, fd, IO);
227
228         }
229
230         switch (action)
231         {
232         case CURL_POLL_NONE:
233                 syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
234                 break;
235         case CURL_POLL_REMOVE:
236                 syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
237                 ev_io_stop(event_base, &IO->recv_event);
238                 ev_io_stop(event_base, &IO->send_event);
239                 break;
240         case CURL_POLL_IN:
241                 ev_io_stop(event_base, &IO->recv_event);
242                 ev_io_start(event_base, &IO->send_event);
243                 break;
244         case CURL_POLL_OUT:
245                 ev_io_stop(event_base, &IO->send_event);
246                 ev_io_start(event_base, &IO->recv_event);
247                 break;
248         case CURL_POLL_INOUT:
249                 ev_io_start(event_base, &IO->send_event);
250                 ev_io_start(event_base, &IO->recv_event);
251                 break;
252         }
253         return 0;
254 }
255
256 void curl_init_connectionpool(void) 
257 {
258         CURLM *mhnd ;
259
260         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
261         global.timeev.data = (void *)&global;
262         global.nrun = -1;
263         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
264
265         if (sta) 
266         {
267                 syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
268                 exit(1);
269         }
270         mhnd = global.mhnd = curl_multi_init();
271         if (!mhnd)
272         {
273                 syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n");
274                 exit(3);
275         }
276
277         MOPT(SOCKETFUNCTION, &gotwatchsock);
278         MOPT(SOCKETDATA, (void *)&global);
279         MOPT(TIMERFUNCTION, &gotwatchtime);
280         MOPT(TIMERDATA, (void *)&global);
281
282         return;
283 }
284
285
286
287
288 int evcurl_init(AsyncIO *IO, 
289                 void *CustomData, 
290                 const char* Desc,
291                 IO_CallBack CallBack, 
292                 IO_CallBack Terminate)
293 {
294         CURLcode sta;
295         CURL *chnd;
296
297         syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
298         IO->HttpReq.attached = 0;
299         IO->SendDone = CallBack;
300         IO->Terminate = Terminate;
301         chnd = IO->HttpReq.chnd = curl_easy_init();
302         if (!chnd)
303         {
304                 syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
305                 return 1;
306         }
307
308         strcpy(IO->HttpReq.errdesc, Desc);
309
310         OPT(VERBOSE, (long)1);
311                 /* unset in production */
312         OPT(NOPROGRESS, (long)1); 
313         OPT(NOSIGNAL, (long)1);
314         OPT(FAILONERROR, (long)1);
315         OPT(ENCODING, "");
316         OPT(FOLLOWLOCATION, (long)1);
317         OPT(MAXREDIRS, (long)7);
318         OPT(USERAGENT, CITADEL);
319
320         OPT(TIMEOUT, (long)1800);
321         OPT(LOW_SPEED_LIMIT, (long)64);
322         OPT(LOW_SPEED_TIME, (long)600);
323         OPT(CONNECTTIMEOUT, (long)600); 
324         OPT(PRIVATE, (void *)IO);
325
326         OPT(FORBID_REUSE, 1);
327         OPT(WRITEFUNCTION, &gotdata); 
328         OPT(WRITEDATA, (void *)IO);
329         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
330
331         if (
332                 (!IsEmptyStr(config.c_ip_addr))
333                 && (strcmp(config.c_ip_addr, "*"))
334                 && (strcmp(config.c_ip_addr, "::"))
335                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
336         ) {
337                 OPT(INTERFACE, config.c_ip_addr);
338         }
339                 /* point to a structure that points back to the perl structure and stuff */
340         syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
341         OPT(URL, IO->ConnectMe->PlainUrl);
342         if (StrLength(IO->ConnectMe->CurlCreds))
343         {
344                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
345                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
346         }
347 #ifdef CURLOPT_HTTP_CONTENT_DECODING
348         OPT(HTTP_CONTENT_DECODING, 1);
349         OPT(ENCODING, "");
350 #endif
351         if (StrLength(IO->HttpReq.PostData) > 0)
352         { 
353                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
354                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
355
356         }
357         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
358         {
359                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
360                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
361         }
362
363         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Connection: close");
364         OPT(HTTPHEADER, IO->HttpReq.headers);
365
366         return 1;
367 }
368
369 eNextState
370 evcurl_handle_start(AsyncIO *IO) 
371 {
372         CURLMcode msta;
373         IO->NextState = eConnect;
374         syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
375         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
376         if (msta)
377                 syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
378         IO->HttpReq.attached = 1;
379         ev_async_send (event_base, &WakeupCurl);
380         return eReadMessage;
381 }
382
383 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
384 {
385         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
386
387         curl_multi_perform(&global, CURL_POLL_NONE);
388 }
389
390 static void evcurl_shutdown (void)
391 {
392         curl_multi_cleanup(global.mhnd);
393 }
394 /*****************************************************************************
395  *                       libevent integration                                *
396  *****************************************************************************/
397 /*
398  * client event queue plus its methods.
399  * this currently is the main loop (which may change in some future?)
400  */
401 int evbase_count = 0;
402 int event_add_pipe[2] = {-1, -1};
403 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
404 HashList *QueueEvents = NULL;
405 HashList *InboundEventQueue = NULL;
406 HashList *InboundEventQueues[2] = { NULL, NULL };
407
408 ev_async AddJob;   
409 ev_async ExitEventLoop;
410
411 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
412 {
413         HashList *q;
414         void *v;
415         HashPos  *It;
416         long len;
417         const char *Key;
418
419         /* get the control command... */
420         pthread_mutex_lock(&EventQueueMutex);
421
422         if (InboundEventQueues[0] == InboundEventQueue) {
423                 InboundEventQueue = InboundEventQueues[1];
424                 q = InboundEventQueues[0];
425         }
426         else {
427                 InboundEventQueue = InboundEventQueues[0];
428                 q = InboundEventQueues[1];
429         }
430         pthread_mutex_unlock(&EventQueueMutex);
431
432         It = GetNewHashPos(q, 0);
433         while (GetNextHashPos(q, It, &len, &Key, &v))
434         {
435                 IOAddHandler *h = v;
436                 h->EvAttch(h->IO);
437         }
438         DeleteHashPos(&It);
439         DeleteHashContent(&q);
440         syslog(LOG_DEBUG, "EVENT Q Read done.\n");
441 }
442
443
444 static void EventExitCallback(EV_P_ ev_async *w, int revents)
445 {
446         ev_break(event_base, EVBREAK_ALL);
447
448         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
449 }
450
451
452
453 void InitEventQueue(void)
454 {
455         struct rlimit LimitSet;
456
457         pthread_mutex_init(&EventQueueMutex, NULL);
458
459         if (pipe(event_add_pipe) != 0) {
460                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
461                 abort();
462         }
463         LimitSet.rlim_cur = 1;
464         LimitSet.rlim_max = 1;
465         setrlimit(event_add_pipe[1], &LimitSet);
466
467         QueueEvents = NewHash(1, Flathash);
468         InboundEventQueues[0] = NewHash(1, Flathash);
469         InboundEventQueues[1] = NewHash(1, Flathash);
470         InboundEventQueue = InboundEventQueues[0];
471 }
472 /*
473  * this thread operates the select() etc. via libev.
474  * 
475  * 
476  */
477 void *client_event_thread(void *arg) 
478 {
479         struct CitContext libev_client_CC;
480
481         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
482 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
483         syslog(LOG_DEBUG, "client_ev_thread() initializing\n");
484
485         event_base = ev_default_loop (EVFLAG_AUTO);
486
487         ev_async_init(&AddJob, QueueEventAddCallback);
488         ev_async_start(event_base, &AddJob);
489         ev_async_init(&ExitEventLoop, EventExitCallback);
490         ev_async_start(event_base, &ExitEventLoop);
491         ev_async_init(&WakeupCurl, WakeupCurlCallback);
492         ev_async_start(event_base, &WakeupCurl);
493
494         curl_init_connectionpool();
495
496         ev_run (event_base, 0);
497
498
499 ///what todo here?      CtdlClearSystemContext();
500         ev_loop_destroy (EV_DEFAULT_UC);
501         
502         DeleteHash(&QueueEvents);
503         InboundEventQueue = NULL;
504         DeleteHash(&InboundEventQueues[0]);
505         DeleteHash(&InboundEventQueues[1]);
506 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
507         evcurl_shutdown();
508
509         return(NULL);
510 }
511 /*------------------------------------------------------------------------------*/
512 /*
513  * DB-Queue; does async bdb operations.
514  * has its own set of handlers.
515  */
516 ev_loop *event_db;
517 int evdb_count = 0;
518 int evdb_add_pipe[2] = {-1, -1};
519 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
520 HashList *DBQueueEvents = NULL;
521 HashList *DBInboundEventQueue = NULL;
522 HashList *DBInboundEventQueues[2] = { NULL, NULL };
523
524 ev_async DBAddJob;   
525 ev_async DBExitEventLoop;
526
527 extern void ShutDownDBCLient(AsyncIO *IO);
528
529 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
530 {
531         HashList *q;
532         void *v;
533         HashPos  *It;
534         long len;
535         const char *Key;
536
537         /* get the control command... */
538         pthread_mutex_lock(&DBEventQueueMutex);
539
540         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
541                 DBInboundEventQueue = DBInboundEventQueues[1];
542                 q = DBInboundEventQueues[0];
543         }
544         else {
545                 DBInboundEventQueue = DBInboundEventQueues[0];
546                 q = DBInboundEventQueues[1];
547         }
548         pthread_mutex_unlock(&DBEventQueueMutex);
549
550         It = GetNewHashPos(q, 0);
551         while (GetNextHashPos(q, It, &len, &Key, &v))
552         {
553                 IOAddHandler *h = v;
554                 eNextState rc;
555                 rc = h->EvAttch(h->IO);
556                 switch (rc)
557                 {
558                 case eAbort:
559                     ShutDownDBCLient(h->IO);
560                 default:
561                     break;
562                 }
563         }
564         DeleteHashPos(&It);
565         DeleteHashContent(&q);
566         syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
567 }
568
569
570 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
571 {
572         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
573         ev_break(event_db, EVBREAK_ALL);
574 }
575
576
577
578 void DBInitEventQueue(void)
579 {
580         struct rlimit LimitSet;
581
582         pthread_mutex_init(&DBEventQueueMutex, NULL);
583
584         if (pipe(evdb_add_pipe) != 0) {
585                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
586                 abort();
587         }
588         LimitSet.rlim_cur = 1;
589         LimitSet.rlim_max = 1;
590         setrlimit(evdb_add_pipe[1], &LimitSet);
591
592         DBQueueEvents = NewHash(1, Flathash);
593         DBInboundEventQueues[0] = NewHash(1, Flathash);
594         DBInboundEventQueues[1] = NewHash(1, Flathash);
595         DBInboundEventQueue = DBInboundEventQueues[0];
596 }
597
598 /*
599  * this thread operates writing to the message database via libev.
600  * 
601  * 
602  */
603 void *db_event_thread(void *arg) 
604 {
605         struct CitContext libev_msg_CC;
606
607         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
608 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
609         syslog(LOG_DEBUG, "client_msgev_thread() initializing\n");
610
611         event_db = ev_loop_new (EVFLAG_AUTO);
612
613         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
614         ev_async_start(event_db, &DBAddJob);
615         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
616         ev_async_start(event_db, &DBExitEventLoop);
617
618         ev_run (event_db, 0);
619
620
621 //// what to do here?   CtdlClearSystemContext();
622         ev_loop_destroy (event_db);
623
624         DeleteHash(&DBQueueEvents);
625         DBInboundEventQueue = NULL;
626         DeleteHash(&DBInboundEventQueues[0]);
627         DeleteHash(&DBInboundEventQueues[1]);
628 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
629
630         return(NULL);
631 }
632
633 CTDL_MODULE_INIT(event_client)
634 {
635         if (!threading)
636         {
637                 InitEventQueue();
638                 DBInitEventQueue();
639                 CtdlThreadCreate(/*"Client event", */ client_event_thread);
640                 CtdlThreadCreate(/*"DB event", */db_event_thread);
641 /// todo register shutdown callback.
642         }
643         return "event";
644 }