01492e56c437900bed34673196dc484f2908a2a2
[citadel] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66
67 long EvIDSource = 1;
68 /*****************************************************************************
69  *                   libevent / curl integration                             *
70  *****************************************************************************/
71 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
72
73 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
74         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
75                               IO->ID, CCID, __VA_ARGS__)
76
77 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
78         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,    \
79                               IO->ID, CCID)
80
81 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
82         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
83
84 #define CURLM_syslog(LEVEL, FORMAT)                     \
85         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
86
87 #define MOPT(s, v)                                                      \
88         do {                                                            \
89                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
90                 if (sta) {                                              \
91                         EVQ_syslog(LOG_ERR, "error setting option "     \
92                                #s " on curl multi handle: %s\n",        \
93                                curl_easy_strerror(sta));                \
94                         exit (1);                                       \
95                 }                                                       \
96         } while (0)
97
98
99 typedef struct _evcurl_global_data {
100         int magic;
101         CURLM *mhnd;
102         ev_timer timeev;
103         int nrun;
104 } evcurl_global_data;
105
106 ev_async WakeupCurl;
107 evcurl_global_data global;
108
109 static void
110 gotstatus(int nnrun)
111 {
112         CURLMsg *msg;
113         int nmsg;
114
115         global.nrun = nnrun;
116
117         CURLM_syslog(LOG_DEBUG,
118                      "gotstatus(): about to call curl_multi_info_read\n");
119         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
120                 CURL_syslog(LOG_DEBUG,
121                             "got curl multi_info message msg=%d\n",
122                             msg->msg);
123
124                 if (CURLMSG_DONE == msg->msg) {
125                         CURL *chnd;
126                         char *chandle = NULL;
127                         CURLcode sta;
128                         CURLMcode msta;
129                         AsyncIO*IO;
130
131                         chandle = NULL;;
132                         chnd = msg->easy_handle;
133                         sta = curl_easy_getinfo(chnd,
134                                                 CURLINFO_PRIVATE,
135                                                 &chandle);
136                         if (sta) {
137                                 syslog(LOG_ERR,
138                                        "error asking curl for private"
139                                        " cookie of curl handle: %s\n",
140                                        curl_easy_strerror(sta));
141                                 continue;
142                         }
143                         IO = (AsyncIO *)chandle;
144                         if (IO->ID == 0) {
145                                 EVCURL_syslog(LOG_ERR,
146                                               "Error, invalid IO context %p\n",
147                                               IO);
148                                 continue;
149                         }
150
151                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
152
153                         IO->Now = ev_now(event_base);
154
155                         ev_io_stop(event_base, &IO->recv_event);
156                         ev_io_stop(event_base, &IO->send_event);
157
158                         sta = msg->data.result;
159                         if (sta) {
160                                 EVCURL_syslog(LOG_ERR,
161                                               "error description: %s\n",
162                                               IO->HttpReq.errdesc);
163                                 EVCURL_syslog(LOG_ERR,
164                                               "error performing request: %s\n",
165                                               curl_easy_strerror(sta));
166                         }
167                         sta = curl_easy_getinfo(chnd,
168                                                 CURLINFO_RESPONSE_CODE,
169                                                 &IO->HttpReq.httpcode);
170                         if (sta)
171                                 EVCURL_syslog(LOG_ERR,
172                                               "error asking curl for "
173                                               "response code from request: %s\n",
174                                               curl_easy_strerror(sta));
175                         EVCURL_syslog(LOG_DEBUG,
176                                       "http response code was %ld\n",
177                                       (long)IO->HttpReq.httpcode);
178
179
180                         curl_slist_free_all(IO->HttpReq.headers);
181                         msta = curl_multi_remove_handle(global.mhnd, chnd);
182                         if (msta)
183                                 EVCURL_syslog(LOG_ERR,
184                                               "warning problem detaching "
185                                               "completed handle from curl multi: "
186                                               "%s\n",
187                                               curl_multi_strerror(msta));
188
189                        ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
190
191                         IO->HttpReq.attached = 0;
192                         switch(IO->SendDone(IO))
193                         {
194                         case eDBQuery:
195                                 curl_easy_cleanup(IO->HttpReq.chnd);
196                                 IO->HttpReq.chnd = NULL;
197                                 break;
198                         case eSendDNSQuery:
199                         case eReadDNSReply:
200                         case eConnect:
201                         case eSendReply:
202                         case eSendMore:
203                         case eSendFile:
204                         case eReadMessage:
205                         case eReadMore:
206                         case eReadPayload:
207                         case eReadFile:
208                                 curl_easy_cleanup(IO->HttpReq.chnd);
209                                 IO->HttpReq.chnd = NULL;
210                                 break;
211                         case eTerminateConnection:
212                         case eAbort:
213                                 curl_easy_cleanup(IO->HttpReq.chnd);
214                                 IO->HttpReq.chnd = NULL;
215                                 FreeStrBuf(&IO->HttpReq.ReplyData);
216                                 FreeURL(&IO->ConnectMe);
217                                 RemoveContext(IO->CitContext);
218                                 IO->Terminate(IO);
219                         }
220                 }
221         }
222 }
223
224 static void
225 stepmulti(void *data, curl_socket_t fd, int which)
226 {
227         int running_handles = 0;
228         CURLMcode msta;
229
230         msta = curl_multi_socket_action(global.mhnd,
231                                         fd,
232                                         which,
233                                         &running_handles);
234
235         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
236         if (msta)
237                 CURL_syslog(LOG_ERR,
238                             "error in curl processing events"
239                             "on multi handle, fd %d: %s\n",
240                             (int)fd,
241                             curl_multi_strerror(msta));
242
243         if (global.nrun != running_handles)
244                 gotstatus(running_handles);
245 }
246
247 static void
248 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
249 {
250         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
251         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
252 }
253
254 static void
255 got_in(struct ev_loop *loop, ev_io *ioev, int events)
256 {
257         CURL_syslog(LOG_DEBUG,
258                     "EVCURL: waking up curl for io on fd %d\n",
259                     (int)ioev->fd);
260
261         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
262 }
263
264 static void
265 got_out(struct ev_loop *loop, ev_io *ioev, int events)
266 {
267         CURL_syslog(LOG_DEBUG,
268                     "waking up curl for io on fd %d\n",
269                     (int)ioev->fd);
270
271         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
272 }
273
274 static size_t
275 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
276         AsyncIO *IO = (AsyncIO*) cglobal;
277
278         if (IO->HttpReq.ReplyData == NULL)
279         {
280                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
281         }
282         IO->Now = ev_now(event_base);
283         return CurlFillStrBuf_callback(data,
284                                        size,
285                                        nmemb,
286                                        IO->HttpReq.ReplyData);
287 }
288
289 static int
290 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
291         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
292         evcurl_global_data *global = cglobal;
293         ev_timer_stop(EV_DEFAULT, &global->timeev);
294         if (tblock_ms < 0 || 14000 < tblock_ms)
295                 tblock_ms = 14000;
296         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
297         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
298         curl_multi_perform(global, &global->nrun);
299         return 0;
300 }
301
302 static int
303 gotwatchsock(CURL *easy,
304              curl_socket_t fd,
305              int action,
306              void *cglobal,
307              void *vIO)
308 {
309         evcurl_global_data *global = cglobal;
310         CURLM *mhnd = global->mhnd;
311         char *f;
312         AsyncIO *IO = (AsyncIO*) vIO;
313         CURLcode sta;
314         const char *Action;
315
316         if (IO == NULL) {
317                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
318                 if (sta) {
319                         CURL_syslog(LOG_ERR,
320                                     "EVCURL: error asking curl for private "
321                                     "cookie of curl handle: %s\n",
322                                     curl_easy_strerror(sta));
323                         return -1;
324                 }
325                 IO = (AsyncIO *) f;
326                 EVCURL_syslog(LOG_DEBUG,
327                               "EVCURL: got socket for URL: %s\n",
328                               IO->ConnectMe->PlainUrl);
329
330                 if (IO->SendBuf.fd != 0)
331                 {
332                         ev_io_stop(event_base, &IO->recv_event);
333                         ev_io_stop(event_base, &IO->send_event);
334                 }
335                 IO->SendBuf.fd = fd;
336                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
337                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
338                 curl_multi_assign(mhnd, fd, IO);
339         }
340
341         IO->Now = ev_now(event_base);
342
343         Action = "";
344         switch (action)
345         {
346         case CURL_POLL_NONE:
347                 Action = "CURL_POLL_NONE";
348                 break;
349         case CURL_POLL_REMOVE:
350                 Action = "CURL_POLL_REMOVE";
351                 break;
352         case CURL_POLL_IN:
353                 Action = "CURL_POLL_IN";
354                 break;
355         case CURL_POLL_OUT:
356                 Action = "CURL_POLL_OUT";
357                 break;
358         case CURL_POLL_INOUT:
359                 Action = "CURL_POLL_INOUT";
360                 break;
361         }
362
363
364         EVCURL_syslog(LOG_DEBUG,
365                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
366                       (int)fd, Action, action);
367
368         switch (action)
369         {
370         case CURL_POLL_NONE:
371                 EVCURLM_syslog(LOG_DEBUG,
372                                "called first time "
373                                "to register this sockwatcker\n");
374                 break;
375         case CURL_POLL_REMOVE:
376                 EVCURLM_syslog(LOG_DEBUG,
377                                "called last time to unregister "
378                                "this sockwatcher\n");
379                 ev_io_stop(event_base, &IO->recv_event);
380                 ev_io_stop(event_base, &IO->send_event);
381                 break;
382         case CURL_POLL_IN:
383                 ev_io_start(event_base, &IO->recv_event);
384                 ev_io_stop(event_base, &IO->send_event);
385                 break;
386         case CURL_POLL_OUT:
387                 ev_io_start(event_base, &IO->send_event);
388                 ev_io_stop(event_base, &IO->recv_event);
389                 break;
390         case CURL_POLL_INOUT:
391                 ev_io_start(event_base, &IO->send_event);
392                 ev_io_start(event_base, &IO->recv_event);
393                 break;
394         }
395         return 0;
396 }
397
398 void curl_init_connectionpool(void)
399 {
400         CURLM *mhnd ;
401
402         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
403         global.timeev.data = (void *)&global;
404         global.nrun = -1;
405         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
406
407         if (sta)
408         {
409                 CURL_syslog(LOG_ERR,
410                             "error initializing curl library: %s\n",
411                             curl_easy_strerror(sta));
412
413                 exit(1);
414         }
415         mhnd = global.mhnd = curl_multi_init();
416         if (!mhnd)
417         {
418                 CURLM_syslog(LOG_ERR,
419                              "error initializing curl multi handle\n");
420                 exit(3);
421         }
422
423         MOPT(SOCKETFUNCTION, &gotwatchsock);
424         MOPT(SOCKETDATA, (void *)&global);
425         MOPT(TIMERFUNCTION, &gotwatchtime);
426         MOPT(TIMERDATA, (void *)&global);
427
428         return;
429 }
430
431 int evcurl_init(AsyncIO *IO)
432 {
433         CURLcode sta;
434         CURL *chnd;
435
436         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
437         IO->HttpReq.attached = 0;
438         chnd = IO->HttpReq.chnd = curl_easy_init();
439         if (!chnd)
440         {
441                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
442                 return 0;
443         }
444
445 #if DEBUG
446         OPT(VERBOSE, (long)1);
447 #endif
448         OPT(NOPROGRESS, 1L);
449
450         OPT(NOSIGNAL, 1L);
451         OPT(FAILONERROR, (long)1);
452         OPT(ENCODING, "");
453         OPT(FOLLOWLOCATION, (long)0);
454         OPT(MAXREDIRS, (long)0);
455         OPT(USERAGENT, CITADEL);
456
457         OPT(TIMEOUT, (long)1800);
458         OPT(LOW_SPEED_LIMIT, (long)64);
459         OPT(LOW_SPEED_TIME, (long)600);
460         OPT(CONNECTTIMEOUT, (long)600);
461         OPT(PRIVATE, (void *)IO);
462
463         OPT(FORBID_REUSE, 1);
464         OPT(WRITEFUNCTION, &gotdata);
465         OPT(WRITEDATA, (void *)IO);
466         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
467
468         if ((!IsEmptyStr(config.c_ip_addr))
469                 && (strcmp(config.c_ip_addr, "*"))
470                 && (strcmp(config.c_ip_addr, "::"))
471                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
472                 )
473         {
474                 OPT(INTERFACE, config.c_ip_addr);
475         }
476
477 #ifdef CURLOPT_HTTP_CONTENT_DECODING
478         OPT(HTTP_CONTENT_DECODING, 1);
479         OPT(ENCODING, "");
480 #endif
481
482         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
483                                                 "Connection: close");
484
485         return 1;
486 }
487
488
489 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
490                                            ev_cleanup *watcher,
491                                            int revents)
492 {
493         CURLMcode msta;
494         AsyncIO *IO = watcher->data;
495
496         if (IO == NULL)
497                 return;
498         IO->Now = ev_now(event_base);
499         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
500
501         curl_slist_free_all(IO->HttpReq.headers);
502         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
503         if (msta)
504         {
505                 EVCURL_syslog(LOG_ERR,
506                               "EVCURL: warning problem detaching completed handle "
507                               "from curl multi: %s\n",
508                               curl_multi_strerror(msta));
509         }
510
511         curl_easy_cleanup(IO->HttpReq.chnd);
512         IO->HttpReq.chnd = NULL;
513         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
514         ev_io_stop(event_base, &IO->recv_event);
515         ev_io_stop(event_base, &IO->send_event);
516         assert(IO->ShutdownAbort);
517         IO->ShutdownAbort(IO);
518 }
519 eNextState
520 evcurl_handle_start(AsyncIO *IO)
521 {
522         CURLMcode msta;
523         CURLcode sta;
524         CURL *chnd;
525
526         chnd = IO->HttpReq.chnd;
527         EVCURL_syslog(LOG_DEBUG,
528                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
529         OPT(URL, IO->ConnectMe->PlainUrl);
530         if (StrLength(IO->ConnectMe->CurlCreds))
531         {
532                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
533                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
534         }
535         if (StrLength(IO->HttpReq.PostData) > 0)
536         {
537                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
538                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
539
540         }
541         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
542                  (IO->HttpReq.PlainPostData != NULL))
543         {
544                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
545                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
546         }
547         OPT(HTTPHEADER, IO->HttpReq.headers);
548
549         IO->NextState = eConnect;
550         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
551         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
552         if (msta)
553         {
554                 EVCURL_syslog(LOG_ERR,
555                           "EVCURL: error attaching to curl multi handle: %s\n",
556                           curl_multi_strerror(msta));
557         }
558
559         IO->HttpReq.attached = 1;
560         ev_async_send (event_base, &WakeupCurl);
561
562         ev_cleanup_init(&IO->abort_by_shutdown,
563                         IOcurl_abort_shutdown_callback);
564
565         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
566
567         return eReadMessage;
568 }
569
570 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
571 {
572         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
573
574         curl_multi_perform(&global, CURL_POLL_NONE);
575 }
576
577 static void evcurl_shutdown (void)
578 {
579         curl_global_cleanup();
580         curl_multi_cleanup(global.mhnd);
581         CURLM_syslog(LOG_DEBUG, "exiting\n");
582 }
583 /*****************************************************************************
584  *                       libevent integration                                *
585  *****************************************************************************/
586 /*
587  * client event queue plus its methods.
588  * this currently is the main loop (which may change in some future?)
589  */
590 int evbase_count = 0;
591 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
592 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
593 HashList *QueueEvents = NULL;
594 HashList *InboundEventQueue = NULL;
595 HashList *InboundEventQueues[2] = { NULL, NULL };
596 extern void ShutDownCLient(AsyncIO *IO);
597
598 ev_async AddJob;
599 ev_async ExitEventLoop;
600
601 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
602 {
603         CitContext *Ctx;
604         ev_tstamp Now;
605         HashList *q;
606         void *v;
607         HashPos*It;
608         long len;
609         const char *Key;
610
611         /* get the control command... */
612         pthread_mutex_lock(&EventQueueMutex);
613
614         if (InboundEventQueues[0] == InboundEventQueue) {
615                 InboundEventQueue = InboundEventQueues[1];
616                 q = InboundEventQueues[0];
617         }
618         else {
619                 InboundEventQueue = InboundEventQueues[0];
620                 q = InboundEventQueues[1];
621         }
622         pthread_mutex_unlock(&EventQueueMutex);
623         Now = ev_now (event_base);
624         It = GetNewHashPos(q, 0);
625         while (GetNextHashPos(q, It, &len, &Key, &v))
626         {
627                 IOAddHandler *h = v;
628                 if (h->IO->ID == 0) {
629                         h->IO->ID = EvIDSource++;
630                 }
631                 if (h->IO->StartIO == 0.0)
632                         h->IO->StartIO = Now;
633
634                 Ctx = h->IO->CitContext;
635                 become_session(Ctx);
636
637                 h->IO->Now = Now;
638                 switch (h->EvAttch(h->IO))
639                 {
640                 case eReadMore:
641                 case eReadMessage:
642                 case eReadFile:
643                 case eSendReply:
644                 case eSendMore:
645                 case eReadPayload:
646                 case eSendFile:
647                 case eDBQuery:
648                 case eSendDNSQuery:
649                 case eReadDNSReply:
650                 case eConnect:
651                         break;
652                 case eTerminateConnection:
653                 case eAbort:
654                         ShutDownCLient(h->IO);
655                 break;
656                 }
657         }
658         DeleteHashPos(&It);
659         DeleteHashContent(&q);
660         EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n");
661 }
662
663
664 static void EventExitCallback(EV_P_ ev_async *w, int revents)
665 {
666         ev_break(event_base, EVBREAK_ALL);
667
668         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
669 }
670
671
672
673 void InitEventQueue(void)
674 {
675         pthread_mutex_init(&EventQueueMutex, NULL);
676         pthread_mutex_init(&EventExitQueueMutex, NULL);
677
678         QueueEvents = NewHash(1, Flathash);
679         InboundEventQueues[0] = NewHash(1, Flathash);
680         InboundEventQueues[1] = NewHash(1, Flathash);
681         InboundEventQueue = InboundEventQueues[0];
682 }
683 extern void CtdlDestroyEVCleanupHooks(void);
684
685 extern int EVQShutDown;
686 /*
687  * this thread operates the select() etc. via libev.
688  */
689 void *client_event_thread(void *arg) 
690 {
691         struct CitContext libev_client_CC;
692
693         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
694 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
695         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
696
697         event_base = ev_default_loop (EVFLAG_AUTO);
698         ev_async_init(&AddJob, QueueEventAddCallback);
699         ev_async_start(event_base, &AddJob);
700         ev_async_init(&ExitEventLoop, EventExitCallback);
701         ev_async_start(event_base, &ExitEventLoop);
702         ev_async_init(&WakeupCurl, WakeupCurlCallback);
703         ev_async_start(event_base, &WakeupCurl);
704
705         curl_init_connectionpool();
706
707         ev_run (event_base, 0);
708
709         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
710
711 ///what todo here?      CtdlClearSystemContext();
712         pthread_mutex_lock(&EventExitQueueMutex);
713         ev_loop_destroy (EV_DEFAULT_UC);
714         event_base = NULL;
715         DeleteHash(&QueueEvents);
716         InboundEventQueue = NULL;
717         DeleteHash(&InboundEventQueues[0]);
718         DeleteHash(&InboundEventQueues[1]);
719 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
720         evcurl_shutdown();
721
722         CtdlDestroyEVCleanupHooks();
723
724         pthread_mutex_unlock(&EventExitQueueMutex);
725         EVQShutDown = 1;
726         return(NULL);
727 }
728
729 /*----------------------------------------------------------------------------*/
730 /*
731  * DB-Queue; does async bdb operations.
732  * has its own set of handlers.
733  */
734 ev_loop *event_db;
735 int evdb_count = 0;
736 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
737 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
738 HashList *DBQueueEvents = NULL;
739 HashList *DBInboundEventQueue = NULL;
740 HashList *DBInboundEventQueues[2] = { NULL, NULL };
741
742 ev_async DBAddJob;
743 ev_async DBExitEventLoop;
744
745 extern void ShutDownDBCLient(AsyncIO *IO);
746
747 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
748 {
749         CitContext *Ctx;
750         ev_tstamp Now;
751         HashList *q;
752         void *v;
753         HashPos *It;
754         long len;
755         const char *Key;
756
757         /* get the control command... */
758         pthread_mutex_lock(&DBEventQueueMutex);
759
760         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
761                 DBInboundEventQueue = DBInboundEventQueues[1];
762                 q = DBInboundEventQueues[0];
763         }
764         else {
765                 DBInboundEventQueue = DBInboundEventQueues[0];
766                 q = DBInboundEventQueues[1];
767         }
768         pthread_mutex_unlock(&DBEventQueueMutex);
769
770         Now = ev_now (event_db);
771         It = GetNewHashPos(q, 0);
772         while (GetNextHashPos(q, It, &len, &Key, &v))
773         {
774                 IOAddHandler *h = v;
775                 eNextState rc;
776                 if (h->IO->ID == 0)
777                         h->IO->ID = EvIDSource++;
778                 if (h->IO->StartDB == 0.0)
779                         h->IO->StartDB = Now;
780                 h->IO->Now = Now;
781
782                 Ctx = h->IO->CitContext;
783                 become_session(Ctx);
784                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
785                 rc = h->EvAttch(h->IO);
786                 switch (rc)
787                 {
788                 case eAbort:
789                         ShutDownDBCLient(h->IO);
790                 default:
791                         break;
792                 }
793         }
794         DeleteHashPos(&It);
795         DeleteHashContent(&q);
796         EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
797 }
798
799
800 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
801 {
802         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
803         ev_break(event_db, EVBREAK_ALL);
804 }
805
806
807
808 void DBInitEventQueue(void)
809 {
810         pthread_mutex_init(&DBEventQueueMutex, NULL);
811         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
812
813         DBQueueEvents = NewHash(1, Flathash);
814         DBInboundEventQueues[0] = NewHash(1, Flathash);
815         DBInboundEventQueues[1] = NewHash(1, Flathash);
816         DBInboundEventQueue = DBInboundEventQueues[0];
817 }
818
819 /*
820  * this thread operates writing to the message database via libev.
821  */
822 void *db_event_thread(void *arg)
823 {
824         ev_loop *tmp;
825         struct CitContext libev_msg_CC;
826
827         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
828
829         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
830
831         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
832
833         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
834         ev_async_start(event_db, &DBAddJob);
835         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
836         ev_async_start(event_db, &DBExitEventLoop);
837
838         ev_run (event_db, 0);
839
840         pthread_mutex_lock(&DBEventExitQueueMutex);
841
842         event_db = NULL;
843         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
844
845         DeleteHash(&DBQueueEvents);
846         DBInboundEventQueue = NULL;
847         DeleteHash(&DBInboundEventQueues[0]);
848         DeleteHash(&DBInboundEventQueues[1]);
849
850 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
851
852         ev_loop_destroy (tmp);
853         pthread_mutex_unlock(&DBEventExitQueueMutex);
854         return(NULL);
855 }
856
857 void ShutDownEventQueues(void)
858 {
859         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
860
861         pthread_mutex_lock(&DBEventQueueMutex);
862         ev_async_send (event_db, &DBExitEventLoop);
863         pthread_mutex_unlock(&DBEventQueueMutex);
864
865         pthread_mutex_lock(&EventQueueMutex);
866         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
867         pthread_mutex_unlock(&EventQueueMutex);
868 }
869
870 void DebugEventloopEnable(const int n)
871 {
872         DebugEventLoop = n;
873 }
874 void DebugEventloopBacktraceEnable(const int n)
875 {
876         DebugEventLoopBacktrace = n;
877 }
878
879 void DebugCurlEnable(const int n)
880 {
881         DebugCurl = n;
882 }
883
884 CTDL_MODULE_INIT(event_client)
885 {
886         if (!threading)
887         {
888                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
889                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
890                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
891                 InitEventQueue();
892                 DBInitEventQueue();
893                 CtdlThreadCreate(client_event_thread);
894                 CtdlThreadCreate(db_event_thread);
895         }
896         return "event";
897 }