Work on evented RSS client & libev+libcurl integration
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #include "event_client.h"
59 #include "serv_curl.h"
60
61 ev_loop *event_base;
62
63 long EvIDSource = 1;
64 /*****************************************************************************
65  *                   libevent / curl integration                             *
66  *****************************************************************************/
67 #define MOPT(s, v)                                                      \
68         do {                                                            \
69                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
70                 if (sta) {                                              \
71                         syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
72                         exit (1);                                       \
73                 }                                                       \
74         } while (0)
75
76 typedef struct _evcurl_global_data {
77         int magic;
78         CURLM *mhnd;
79         ev_timer timeev;
80         int nrun;
81 } evcurl_global_data;
82
83 ev_async WakeupCurl;
84 evcurl_global_data global;
85
86 static void
87 gotstatus(int nnrun) 
88 {
89         CURLM *mhnd;
90         CURLMsg *msg;
91         int nmsg;
92
93         global.nrun = nnrun;
94         mhnd = global.mhnd;
95
96         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
97         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
98                 syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
99                 if (CURLMSG_DONE == msg->msg) {
100                         CURL *chnd;
101                         char *chandle;
102                         CURLcode sta;
103                         CURLMcode msta;
104                         AsyncIO  *IO;
105
106                         chandle = NULL;;
107                         chnd = msg->easy_handle;
108                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
109                         syslog(LOG_ERR, "EVCURL: request complete\n");
110                         if (sta)
111                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
112                         IO = (AsyncIO *)chandle;
113                         
114                         ev_io_stop(event_base, &IO->recv_event);
115                         ev_io_stop(event_base, &IO->send_event);
116
117                         sta = msg->data.result;
118                         if (sta) {
119                                 EV_syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
120                                 EV_syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
121                         }
122                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
123                         if (sta)
124                                 EV_syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
125                         EV_syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
126
127
128                         curl_slist_free_all(IO->HttpReq.headers);
129                         msta = curl_multi_remove_handle(mhnd, chnd);
130                         if (msta)
131                                 EV_syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
132
133                         curl_easy_cleanup(IO->HttpReq.chnd);
134                         IO->HttpReq.chnd = NULL;
135
136                         IO->HttpReq.attached = 0;
137                         switch(IO->SendDone(IO))
138                         {
139                         case eDBQuery:
140                                 break;
141                         case eSendDNSQuery:
142                         case eReadDNSReply:
143                         case eConnect:
144                         case eSendReply: 
145                         case eSendMore:
146                         case eSendFile:
147                         case eReadMessage: 
148                         case eReadMore:
149                         case eReadPayload:
150                         case eReadFile:
151                                 break;
152                         case eTerminateConnection:
153                         case eAbort:
154                                 FreeStrBuf(&IO->HttpReq.ReplyData);
155                                 FreeURL(&IO->ConnectMe);
156                                 RemoveContext(IO->CitContext);
157                                 IO->Terminate(IO);
158                         }
159                 }
160         }
161 }
162
163 static void
164 stepmulti(void *data, curl_socket_t fd, int which)
165 {
166         int running_handles = 0;
167         CURLMcode msta;
168         
169         msta = curl_multi_socket_action(global.mhnd, fd, which, &running_handles);
170         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
171         if (msta)
172                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
173         if (global.nrun != running_handles)
174                 gotstatus(running_handles);
175 }
176
177 static void
178 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
179         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
180         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
181 }
182
183 static void
184 got_in(struct ev_loop *loop, ev_io *ioev, int events) {
185         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
186         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
187 }
188
189 static void
190 got_out(struct ev_loop *loop, ev_io *ioev, int events) {
191         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
192         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
193 }
194
195 static size_t
196 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
197         AsyncIO *IO = (AsyncIO*) cglobal;
198
199         if (IO->HttpReq.ReplyData == NULL)
200         {
201             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
202         }
203         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
204 }
205
206 static int
207 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
208         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
209         evcurl_global_data *global = cglobal;
210         ev_timer_stop(EV_DEFAULT, &global->timeev);
211         if (tblock_ms < 0 || 14000 < tblock_ms)
212                 tblock_ms = 14000;
213         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
214         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
215         curl_multi_perform(global, CURL_POLL_NONE);
216         return 0;
217 }
218
219 static int
220 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) {
221         evcurl_global_data *global = cglobal;
222         CURLM *mhnd = global->mhnd;
223         char *f;
224         AsyncIO *IO = (AsyncIO*) vIO;
225         CURLcode sta;
226         const char *Action;
227
228         if (IO == NULL) {
229                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
230                 if (sta) {
231                         EV_syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
232                         return -1;
233                 }
234                 IO = (AsyncIO *) f;
235                 EV_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl);
236                 if (IO->SendBuf.fd != 0)
237                 {
238                         ev_io_stop(event_base, &IO->recv_event);
239                         ev_io_stop(event_base, &IO->send_event);
240                 }
241                 IO->SendBuf.fd = fd;
242                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
243                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
244                 curl_multi_assign(mhnd, fd, IO);
245         }
246
247         Action = "";
248         switch (action)
249         {
250         case CURL_POLL_NONE:
251                 Action = "CURL_POLL_NONE";
252                 break;
253         case CURL_POLL_REMOVE:
254                 Action = "CURL_POLL_REMOVE";
255                 break;
256         case CURL_POLL_IN:
257                 Action = "CURL_POLL_IN";
258                 break;
259         case CURL_POLL_OUT:
260                 Action = "CURL_POLL_OUT";
261                 break;
262         case CURL_POLL_INOUT:
263                 Action = "CURL_POLL_INOUT";
264                 break;
265         }
266
267
268         EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", (int)fd, Action, action);
269
270         switch (action)
271         {
272         case CURL_POLL_NONE:
273                 EVM_syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
274                 break;
275         case CURL_POLL_REMOVE:
276                 EVM_syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
277                 ev_io_stop(event_base, &IO->recv_event);
278                 ev_io_stop(event_base, &IO->send_event);
279                 break;
280         case CURL_POLL_IN:
281                 ev_io_start(event_base, &IO->recv_event);
282                 ev_io_stop(event_base, &IO->send_event);
283                 break;
284         case CURL_POLL_OUT:
285                 ev_io_start(event_base, &IO->send_event);
286                 ev_io_stop(event_base, &IO->recv_event);
287                 break;
288         case CURL_POLL_INOUT:
289                 ev_io_start(event_base, &IO->send_event);
290                 ev_io_start(event_base, &IO->recv_event);
291                 break;
292         }
293         return 0;
294 }
295
296 void curl_init_connectionpool(void) 
297 {
298         CURLM *mhnd ;
299
300         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
301         global.timeev.data = (void *)&global;
302         global.nrun = -1;
303         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
304
305         if (sta) 
306         {
307                 syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
308                 exit(1);
309         }
310         mhnd = global.mhnd = curl_multi_init();
311         if (!mhnd)
312         {
313                 syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n");
314                 exit(3);
315         }
316
317         MOPT(SOCKETFUNCTION, &gotwatchsock);
318         MOPT(SOCKETDATA, (void *)&global);
319         MOPT(TIMERFUNCTION, &gotwatchtime);
320         MOPT(TIMERDATA, (void *)&global);
321
322         return;
323 }
324
325
326
327
328 int evcurl_init(AsyncIO *IO, 
329                 void *CustomData, 
330                 const char* Desc,
331                 IO_CallBack CallBack, 
332                 IO_CallBack Terminate)
333 {
334         CURLcode sta;
335         CURL *chnd;
336
337         EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
338         IO->HttpReq.attached = 0;
339         IO->SendDone = CallBack;
340         IO->Terminate = Terminate;
341         chnd = IO->HttpReq.chnd = curl_easy_init();
342         if (!chnd)
343         {
344                 EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
345                 return 1;
346         }
347
348         strcpy(IO->HttpReq.errdesc, Desc);
349
350         OPT(VERBOSE, (long)1);
351                 /* unset in production */
352         OPT(NOPROGRESS, (long)1); 
353         OPT(NOSIGNAL, (long)1);
354         OPT(FAILONERROR, (long)1);
355         OPT(ENCODING, "");
356         OPT(FOLLOWLOCATION, (long)0);
357         OPT(MAXREDIRS, (long)7);
358         OPT(USERAGENT, CITADEL);
359
360         OPT(TIMEOUT, (long)1800);
361         OPT(LOW_SPEED_LIMIT, (long)64);
362         OPT(LOW_SPEED_TIME, (long)600);
363         OPT(CONNECTTIMEOUT, (long)600); 
364         OPT(PRIVATE, (void *)IO);
365
366         OPT(FORBID_REUSE, 1);
367         OPT(WRITEFUNCTION, &gotdata); 
368         OPT(WRITEDATA, (void *)IO);
369         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
370
371         if (
372                 (!IsEmptyStr(config.c_ip_addr))
373                 && (strcmp(config.c_ip_addr, "*"))
374                 && (strcmp(config.c_ip_addr, "::"))
375                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
376         ) {
377                 OPT(INTERFACE, config.c_ip_addr);
378         }
379                 /* point to a structure that points back to the perl structure and stuff */
380         EV_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
381         OPT(URL, IO->ConnectMe->PlainUrl);
382         if (StrLength(IO->ConnectMe->CurlCreds))
383         {
384                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
385                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
386         }
387 #ifdef CURLOPT_HTTP_CONTENT_DECODING
388         OPT(HTTP_CONTENT_DECODING, 1);
389         OPT(ENCODING, "");
390 #endif
391         if (StrLength(IO->HttpReq.PostData) > 0)
392         { 
393                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
394                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
395
396         }
397         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
398         {
399                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
400                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
401         }
402
403         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Connection: close");
404         OPT(HTTPHEADER, IO->HttpReq.headers);
405
406         return 1;
407 }
408
409 eNextState
410 evcurl_handle_start(AsyncIO *IO) 
411 {
412         CURLMcode msta;
413         IO->NextState = eConnect;
414         EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
415         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
416         if (msta)
417                 EV_syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
418         IO->HttpReq.attached = 1;
419         ev_async_send (event_base, &WakeupCurl);
420         return eReadMessage;
421 }
422
423 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
424 {
425         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
426
427         curl_multi_perform(&global, CURL_POLL_NONE);
428 }
429
430 static void evcurl_shutdown (void)
431 {
432         curl_multi_cleanup(global.mhnd);
433 }
434 /*****************************************************************************
435  *                       libevent integration                                *
436  *****************************************************************************/
437 /*
438  * client event queue plus its methods.
439  * this currently is the main loop (which may change in some future?)
440  */
441 int evbase_count = 0;
442 int event_add_pipe[2] = {-1, -1};
443 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
444 HashList *QueueEvents = NULL;
445 HashList *InboundEventQueue = NULL;
446 HashList *InboundEventQueues[2] = { NULL, NULL };
447
448 ev_async AddJob;   
449 ev_async ExitEventLoop;
450
451 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
452 {
453         HashList *q;
454         void *v;
455         HashPos  *It;
456         long len;
457         const char *Key;
458
459         /* get the control command... */
460         pthread_mutex_lock(&EventQueueMutex);
461
462         if (InboundEventQueues[0] == InboundEventQueue) {
463                 InboundEventQueue = InboundEventQueues[1];
464                 q = InboundEventQueues[0];
465         }
466         else {
467                 InboundEventQueue = InboundEventQueues[0];
468                 q = InboundEventQueues[1];
469         }
470         pthread_mutex_unlock(&EventQueueMutex);
471
472         It = GetNewHashPos(q, 0);
473         while (GetNextHashPos(q, It, &len, &Key, &v))
474         {
475                 IOAddHandler *h = v;
476                 if (h->IO->ID == 0)
477                         h->IO->ID = EvIDSource++;
478                 h->EvAttch(h->IO);
479         }
480         DeleteHashPos(&It);
481         DeleteHashContent(&q);
482         syslog(LOG_DEBUG, "EVENT Q Add done.\n");
483 }
484
485
486 static void EventExitCallback(EV_P_ ev_async *w, int revents)
487 {
488         ev_break(event_base, EVBREAK_ALL);
489
490         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
491 }
492
493
494
495 void InitEventQueue(void)
496 {
497         struct rlimit LimitSet;
498
499         pthread_mutex_init(&EventQueueMutex, NULL);
500
501         if (pipe(event_add_pipe) != 0) {
502                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
503                 abort();
504         }
505         LimitSet.rlim_cur = 1;
506         LimitSet.rlim_max = 1;
507         setrlimit(event_add_pipe[1], &LimitSet);
508
509         QueueEvents = NewHash(1, Flathash);
510         InboundEventQueues[0] = NewHash(1, Flathash);
511         InboundEventQueues[1] = NewHash(1, Flathash);
512         InboundEventQueue = InboundEventQueues[0];
513 }
514 /*
515  * this thread operates the select() etc. via libev.
516  * 
517  * 
518  */
519 void *client_event_thread(void *arg) 
520 {
521         struct CitContext libev_client_CC;
522
523         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
524 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
525         syslog(LOG_DEBUG, "client_ev_thread() initializing\n");
526
527         event_base = ev_default_loop (EVFLAG_AUTO);
528
529         ev_async_init(&AddJob, QueueEventAddCallback);
530         ev_async_start(event_base, &AddJob);
531         ev_async_init(&ExitEventLoop, EventExitCallback);
532         ev_async_start(event_base, &ExitEventLoop);
533         ev_async_init(&WakeupCurl, WakeupCurlCallback);
534         ev_async_start(event_base, &WakeupCurl);
535
536         curl_init_connectionpool();
537
538         ev_run (event_base, 0);
539
540
541 ///what todo here?      CtdlClearSystemContext();
542         ev_loop_destroy (EV_DEFAULT_UC);
543         
544         DeleteHash(&QueueEvents);
545         InboundEventQueue = NULL;
546         DeleteHash(&InboundEventQueues[0]);
547         DeleteHash(&InboundEventQueues[1]);
548 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
549         evcurl_shutdown();
550
551         return(NULL);
552 }
553 /*------------------------------------------------------------------------------*/
554 /*
555  * DB-Queue; does async bdb operations.
556  * has its own set of handlers.
557  */
558 ev_loop *event_db;
559 int evdb_count = 0;
560 int evdb_add_pipe[2] = {-1, -1};
561 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
562 HashList *DBQueueEvents = NULL;
563 HashList *DBInboundEventQueue = NULL;
564 HashList *DBInboundEventQueues[2] = { NULL, NULL };
565
566 ev_async DBAddJob;   
567 ev_async DBExitEventLoop;
568
569 extern void ShutDownDBCLient(AsyncIO *IO);
570
571 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
572 {
573         HashList *q;
574         void *v;
575         HashPos  *It;
576         long len;
577         const char *Key;
578
579         /* get the control command... */
580         pthread_mutex_lock(&DBEventQueueMutex);
581
582         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
583                 DBInboundEventQueue = DBInboundEventQueues[1];
584                 q = DBInboundEventQueues[0];
585         }
586         else {
587                 DBInboundEventQueue = DBInboundEventQueues[0];
588                 q = DBInboundEventQueues[1];
589         }
590         pthread_mutex_unlock(&DBEventQueueMutex);
591
592         It = GetNewHashPos(q, 0);
593         while (GetNextHashPos(q, It, &len, &Key, &v))
594         {
595                 IOAddHandler *h = v;
596                 eNextState rc;
597                 if (h->IO->ID == 0)
598                         h->IO->ID = EvIDSource++;
599                 rc = h->EvAttch(h->IO);
600                 switch (rc)
601                 {
602                 case eAbort:
603                     ShutDownDBCLient(h->IO);
604                 default:
605                     break;
606                 }
607         }
608         DeleteHashPos(&It);
609         DeleteHashContent(&q);
610         syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
611 }
612
613
614 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
615 {
616         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
617         ev_break(event_db, EVBREAK_ALL);
618 }
619
620
621
622 void DBInitEventQueue(void)
623 {
624         struct rlimit LimitSet;
625
626         pthread_mutex_init(&DBEventQueueMutex, NULL);
627
628         if (pipe(evdb_add_pipe) != 0) {
629                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
630                 abort();
631         }
632         LimitSet.rlim_cur = 1;
633         LimitSet.rlim_max = 1;
634         setrlimit(evdb_add_pipe[1], &LimitSet);
635
636         DBQueueEvents = NewHash(1, Flathash);
637         DBInboundEventQueues[0] = NewHash(1, Flathash);
638         DBInboundEventQueues[1] = NewHash(1, Flathash);
639         DBInboundEventQueue = DBInboundEventQueues[0];
640 }
641
642 /*
643  * this thread operates writing to the message database via libev.
644  * 
645  * 
646  */
647 void *db_event_thread(void *arg) 
648 {
649         struct CitContext libev_msg_CC;
650
651         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
652 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
653         syslog(LOG_DEBUG, "client_msgev_thread() initializing\n");
654
655         event_db = ev_loop_new (EVFLAG_AUTO);
656
657         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
658         ev_async_start(event_db, &DBAddJob);
659         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
660         ev_async_start(event_db, &DBExitEventLoop);
661
662         ev_run (event_db, 0);
663
664
665 //// what to do here?   CtdlClearSystemContext();
666         ev_loop_destroy (event_db);
667
668         DeleteHash(&DBQueueEvents);
669         DBInboundEventQueue = NULL;
670         DeleteHash(&DBInboundEventQueues[0]);
671         DeleteHash(&DBInboundEventQueues[1]);
672 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
673
674         return(NULL);
675 }
676
677 CTDL_MODULE_INIT(event_client)
678 {
679         if (!threading)
680         {
681                 InitEventQueue();
682                 DBInitEventQueue();
683                 CtdlThreadCreate(/*"Client event", */ client_event_thread);
684                 CtdlThreadCreate(/*"DB event", */db_event_thread);
685 /// todo register shutdown callback.
686         }
687         return "event";
688 }