d8c878309c67fcfe86a5a5cd0fdbe50fef550595
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66 pthread_key_t evConKey;
67
68 long EvIDSource = 1;
69 /*****************************************************************************
70  *                   libevent / curl integration                             *
71  *****************************************************************************/
72 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
73
74 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
75         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
76                               IOSTR, IO->ID, CCID, __VA_ARGS__)
77
78 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
79         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
80                               IOSTR, IO->ID, CCID)
81
82 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
83         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
84
85 #define CURLM_syslog(LEVEL, FORMAT)                     \
86         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
87
88 #define MOPT(s, v)                                                      \
89         do {                                                            \
90                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
91                 if (sta) {                                              \
92                         EVQ_syslog(LOG_ERR, "error setting option "     \
93                                #s " on curl multi handle: %s\n",        \
94                                curl_easy_strerror(sta));                \
95                         exit (1);                                       \
96                 }                                                       \
97         } while (0)
98
99
100 typedef struct _evcurl_global_data {
101         int magic;
102         CURLM *mhnd;
103         ev_timer timeev;
104         int nrun;
105 } evcurl_global_data;
106
107 ev_async WakeupCurl;
108 evcurl_global_data global;
109
110 eNextState QueueAnDBOperation(AsyncIO *IO);
111
112 static void
113 gotstatus(int nnrun)
114 {
115         CURLMsg *msg;
116         int nmsg;
117
118         global.nrun = nnrun;
119
120         CURLM_syslog(LOG_DEBUG,
121                      "gotstatus(): about to call curl_multi_info_read\n");
122         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
123                 CURL_syslog(LOG_DEBUG,
124                             "got curl multi_info message msg=%d\n",
125                             msg->msg);
126
127                 if (CURLMSG_DONE == msg->msg) {
128                         CURL *chnd;
129                         char *chandle = NULL;
130                         CURLcode sta;
131                         CURLMcode msta;
132                         AsyncIO*IO;
133
134                         chandle = NULL;;
135                         chnd = msg->easy_handle;
136                         sta = curl_easy_getinfo(chnd,
137                                                 CURLINFO_PRIVATE,
138                                                 &chandle);
139                         if (sta) {
140                                 syslog(LOG_ERR,
141                                        "error asking curl for private"
142                                        " cookie of curl handle: %s\n",
143                                        curl_easy_strerror(sta));
144                                 continue;
145                         }
146                         IO = (AsyncIO *)chandle;
147                         if (IO->ID == 0) {
148                                 EVCURL_syslog(LOG_ERR,
149                                               "Error, invalid IO context %p\n",
150                                               IO);
151                                 continue;
152                         }
153                         SetEVState(IO, eCurlGotStatus);
154
155                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
156
157                         IO->Now = ev_now(event_base);
158
159                         ev_io_stop(event_base, &IO->recv_event);
160                         ev_io_stop(event_base, &IO->send_event);
161
162                         sta = msg->data.result;
163                         if (sta) {
164                                 EVCURL_syslog(LOG_ERR,
165                                               "error description: %s\n",
166                                               IO->HttpReq.errdesc);
167                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
168                                 EVCURL_syslog(LOG_ERR,
169                                               "error performing request: %s\n",
170                                               IO->HttpReq.CurlError);
171                                 if (sta == CURLE_OPERATION_TIMEDOUT)
172                                 {
173                                         IO->SendBuf.fd = 0;
174                                         IO->RecvBuf.fd = 0;
175                                 }
176                         }
177                         sta = curl_easy_getinfo(chnd,
178                                                 CURLINFO_RESPONSE_CODE,
179                                                 &IO->HttpReq.httpcode);
180                         if (sta)
181                                 EVCURL_syslog(LOG_ERR,
182                                               "error asking curl for "
183                                               "response code from request: %s\n",
184                                               curl_easy_strerror(sta));
185                         EVCURL_syslog(LOG_DEBUG,
186                                       "http response code was %ld\n",
187                                       (long)IO->HttpReq.httpcode);
188
189
190                         curl_slist_free_all(IO->HttpReq.headers);
191                         msta = curl_multi_remove_handle(global.mhnd, chnd);
192                         if (msta)
193                                 EVCURL_syslog(LOG_ERR,
194                                               "warning problem detaching "
195                                               "completed handle from curl multi: "
196                                               "%s\n",
197                                               curl_multi_strerror(msta));
198
199                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
200
201                         IO->HttpReq.attached = 0;
202                         switch(IO->SendDone(IO))
203                         {
204                         case eDBQuery:
205                                 FreeURL(&IO->ConnectMe);
206                                 QueueAnDBOperation(IO);
207                                 break;
208                         case eSendDNSQuery:
209                         case eReadDNSReply:
210                         case eConnect:
211                         case eSendReply:
212                         case eSendMore:
213                         case eSendFile:
214                         case eReadMessage:
215                         case eReadMore:
216                         case eReadPayload:
217                         case eReadFile:
218                                 break;
219                         case eTerminateConnection:
220                         case eAbort:
221                                 curl_easy_cleanup(IO->HttpReq.chnd);
222                                 IO->HttpReq.chnd = NULL;
223                                 FreeStrBuf(&IO->HttpReq.ReplyData);
224                                 FreeURL(&IO->ConnectMe);
225                                 RemoveContext(IO->CitContext);
226                                 IO->Terminate(IO);
227                         }
228                 }
229         }
230 }
231
232 static void
233 stepmulti(void *data, curl_socket_t fd, int which)
234 {
235         int running_handles = 0;
236         CURLMcode msta;
237
238         msta = curl_multi_socket_action(global.mhnd,
239                                         fd,
240                                         which,
241                                         &running_handles);
242
243         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
244         if (msta)
245                 CURL_syslog(LOG_ERR,
246                             "error in curl processing events"
247                             "on multi handle, fd %d: %s\n",
248                             (int)fd,
249                             curl_multi_strerror(msta));
250
251         if (global.nrun != running_handles)
252                 gotstatus(running_handles);
253 }
254
255 static void
256 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
257 {
258         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
259         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
260 }
261
262 static void
263 got_in(struct ev_loop *loop, ev_io *ioev, int events)
264 {
265         CURL_syslog(LOG_DEBUG,
266                     "EVCURL: waking up curl for io on fd %d\n",
267                     (int)ioev->fd);
268
269         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
270 }
271
272 static void
273 got_out(struct ev_loop *loop, ev_io *ioev, int events)
274 {
275         CURL_syslog(LOG_DEBUG,
276                     "waking up curl for io on fd %d\n",
277                     (int)ioev->fd);
278
279         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
280 }
281
282 static size_t
283 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
284 {
285         AsyncIO *IO = (AsyncIO*) cglobal;
286
287         SetEVState(IO, eCurlGotData);
288         if (IO->HttpReq.ReplyData == NULL)
289         {
290                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
291         }
292         IO->Now = ev_now(event_base);
293         return CurlFillStrBuf_callback(data,
294                                        size,
295                                        nmemb,
296                                        IO->HttpReq.ReplyData);
297 }
298
299 static int
300 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
301         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
302         evcurl_global_data *global = cglobal;
303         ev_timer_stop(EV_DEFAULT, &global->timeev);
304         if (tblock_ms < 0 || 14000 < tblock_ms)
305                 tblock_ms = 14000;
306         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
307         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
308         curl_multi_perform(global, &global->nrun);
309         return 0;
310 }
311
312 static int
313 gotwatchsock(CURL *easy,
314              curl_socket_t fd,
315              int action,
316              void *cglobal,
317              void *vIO)
318 {
319         evcurl_global_data *global = cglobal;
320         CURLM *mhnd = global->mhnd;
321         char *f;
322         AsyncIO *IO = (AsyncIO*) vIO;
323         CURLcode sta;
324         const char *Action;
325
326         if (IO == NULL) {
327                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
328                 if (sta) {
329                         CURL_syslog(LOG_ERR,
330                                     "EVCURL: error asking curl for private "
331                                     "cookie of curl handle: %s\n",
332                                     curl_easy_strerror(sta));
333                         return -1;
334                 }
335                 IO = (AsyncIO *) f;
336                 SetEVState(IO, eCurlNewIO);
337                 EVCURL_syslog(LOG_DEBUG,
338                               "EVCURL: got socket for URL: %s\n",
339                               IO->ConnectMe->PlainUrl);
340
341                 if (IO->SendBuf.fd != 0)
342                 {
343                         ev_io_stop(event_base, &IO->recv_event);
344                         ev_io_stop(event_base, &IO->send_event);
345                 }
346                 IO->SendBuf.fd = fd;
347                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
348                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
349                 curl_multi_assign(mhnd, fd, IO);
350         }
351
352         SetEVState(IO, eCurlGotIO);
353         IO->Now = ev_now(event_base);
354
355         Action = "";
356         switch (action)
357         {
358         case CURL_POLL_NONE:
359                 Action = "CURL_POLL_NONE";
360                 break;
361         case CURL_POLL_REMOVE:
362                 Action = "CURL_POLL_REMOVE";
363                 break;
364         case CURL_POLL_IN:
365                 Action = "CURL_POLL_IN";
366                 break;
367         case CURL_POLL_OUT:
368                 Action = "CURL_POLL_OUT";
369                 break;
370         case CURL_POLL_INOUT:
371                 Action = "CURL_POLL_INOUT";
372                 break;
373         }
374
375
376         EVCURL_syslog(LOG_DEBUG,
377                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
378                       (int)fd, Action, action);
379
380         switch (action)
381         {
382         case CURL_POLL_NONE:
383                 EVCURLM_syslog(LOG_DEBUG,
384                                "called first time "
385                                "to register this sockwatcker\n");
386                 break;
387         case CURL_POLL_REMOVE:
388                 EVCURLM_syslog(LOG_DEBUG,
389                                "called last time to unregister "
390                                "this sockwatcher\n");
391                 ev_io_stop(event_base, &IO->recv_event);
392                 ev_io_stop(event_base, &IO->send_event);
393                 break;
394         case CURL_POLL_IN:
395                 ev_io_start(event_base, &IO->recv_event);
396                 ev_io_stop(event_base, &IO->send_event);
397                 break;
398         case CURL_POLL_OUT:
399                 ev_io_start(event_base, &IO->send_event);
400                 ev_io_stop(event_base, &IO->recv_event);
401                 break;
402         case CURL_POLL_INOUT:
403                 ev_io_start(event_base, &IO->send_event);
404                 ev_io_start(event_base, &IO->recv_event);
405                 break;
406         }
407         return 0;
408 }
409
410 void curl_init_connectionpool(void)
411 {
412         CURLM *mhnd ;
413
414         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
415         global.timeev.data = (void *)&global;
416         global.nrun = -1;
417         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
418
419         if (sta)
420         {
421                 CURL_syslog(LOG_ERR,
422                             "error initializing curl library: %s\n",
423                             curl_easy_strerror(sta));
424
425                 exit(1);
426         }
427         mhnd = global.mhnd = curl_multi_init();
428         if (!mhnd)
429         {
430                 CURLM_syslog(LOG_ERR,
431                              "error initializing curl multi handle\n");
432                 exit(3);
433         }
434
435         MOPT(SOCKETFUNCTION, &gotwatchsock);
436         MOPT(SOCKETDATA, (void *)&global);
437         MOPT(TIMERFUNCTION, &gotwatchtime);
438         MOPT(TIMERDATA, (void *)&global);
439
440         return;
441 }
442
443 int evcurl_init(AsyncIO *IO)
444 {
445         CURLcode sta;
446         CURL *chnd;
447
448         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
449         IO->HttpReq.attached = 0;
450         chnd = IO->HttpReq.chnd = curl_easy_init();
451         if (!chnd)
452         {
453                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
454                 return 0;
455         }
456
457 #if DEBUG
458         OPT(VERBOSE, (long)1);
459 #endif
460         OPT(NOPROGRESS, 1L);
461
462         OPT(NOSIGNAL, 1L);
463         OPT(FAILONERROR, (long)1);
464         OPT(ENCODING, "");
465         OPT(FOLLOWLOCATION, (long)0);
466         OPT(MAXREDIRS, (long)0);
467         OPT(USERAGENT, CITADEL);
468
469         OPT(TIMEOUT, (long)1800);
470         OPT(LOW_SPEED_LIMIT, (long)64);
471         OPT(LOW_SPEED_TIME, (long)600);
472         OPT(CONNECTTIMEOUT, (long)600);
473         OPT(PRIVATE, (void *)IO);
474
475         OPT(FORBID_REUSE, 1);
476         OPT(WRITEFUNCTION, &gotdata);
477         OPT(WRITEDATA, (void *)IO);
478         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
479
480         if ((!IsEmptyStr(config.c_ip_addr))
481                 && (strcmp(config.c_ip_addr, "*"))
482                 && (strcmp(config.c_ip_addr, "::"))
483                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
484                 )
485         {
486                 OPT(INTERFACE, config.c_ip_addr);
487         }
488
489 #ifdef CURLOPT_HTTP_CONTENT_DECODING
490         OPT(HTTP_CONTENT_DECODING, 1);
491         OPT(ENCODING, "");
492 #endif
493
494         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
495                                                 "Connection: close");
496
497         return 1;
498 }
499
500
501 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
502                                            ev_cleanup *watcher,
503                                            int revents)
504 {
505         CURLMcode msta;
506         AsyncIO *IO = watcher->data;
507
508         if (IO == NULL)
509                 return;
510
511         SetEVState(IO, eCurlShutdown);
512         IO->Now = ev_now(event_base);
513         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
514
515         curl_slist_free_all(IO->HttpReq.headers);
516         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
517         if (msta)
518         {
519                 EVCURL_syslog(LOG_ERR,
520                               "EVCURL: warning problem detaching completed handle "
521                               "from curl multi: %s\n",
522                               curl_multi_strerror(msta));
523         }
524
525         curl_easy_cleanup(IO->HttpReq.chnd);
526         IO->HttpReq.chnd = NULL;
527         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
528         ev_io_stop(event_base, &IO->recv_event);
529         ev_io_stop(event_base, &IO->send_event);
530         assert(IO->ShutdownAbort);
531         IO->ShutdownAbort(IO);
532 }
533 eNextState
534 evcurl_handle_start(AsyncIO *IO)
535 {
536         CURLMcode msta;
537         CURLcode sta;
538         CURL *chnd;
539
540         SetEVState(IO, eCurlStart);
541         chnd = IO->HttpReq.chnd;
542         EVCURL_syslog(LOG_DEBUG,
543                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
544         OPT(URL, IO->ConnectMe->PlainUrl);
545         if (StrLength(IO->ConnectMe->CurlCreds))
546         {
547                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
548                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
549         }
550         if (StrLength(IO->HttpReq.PostData) > 0)
551         {
552                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
553                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
554
555         }
556         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
557                  (IO->HttpReq.PlainPostData != NULL))
558         {
559                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
560                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
561         }
562         OPT(HTTPHEADER, IO->HttpReq.headers);
563
564         IO->NextState = eConnect;
565         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
566         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
567         if (msta)
568         {
569                 EVCURL_syslog(LOG_ERR,
570                           "EVCURL: error attaching to curl multi handle: %s\n",
571                           curl_multi_strerror(msta));
572         }
573
574         IO->HttpReq.attached = 1;
575         ev_async_send (event_base, &WakeupCurl);
576
577         ev_cleanup_init(&IO->abort_by_shutdown,
578                         IOcurl_abort_shutdown_callback);
579
580         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
581
582         return eReadMessage;
583 }
584
585 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
586 {
587         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
588
589         curl_multi_perform(&global, CURL_POLL_NONE);
590 }
591
592 static void evcurl_shutdown (void)
593 {
594         curl_global_cleanup();
595         curl_multi_cleanup(global.mhnd);
596         CURLM_syslog(LOG_DEBUG, "exiting\n");
597 }
598 /*****************************************************************************
599  *                       libevent integration                                *
600  *****************************************************************************/
601 /*
602  * client event queue plus its methods.
603  * this currently is the main loop (which may change in some future?)
604  */
605 int evbase_count = 0;
606 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
607 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
608 HashList *QueueEvents = NULL;
609 HashList *InboundEventQueue = NULL;
610 HashList *InboundEventQueues[2] = { NULL, NULL };
611 extern void ShutDownCLient(AsyncIO *IO);
612
613 ev_async AddJob;
614 ev_async ExitEventLoop;
615
616 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
617 {
618         CitContext *Ctx;
619         long IOID = -1;
620         long count = 0;
621         ev_tstamp Now;
622         HashList *q;
623         void *v;
624         HashPos*It;
625         long len;
626         const char *Key;
627
628         /* get the control command... */
629         pthread_mutex_lock(&EventQueueMutex);
630
631         if (InboundEventQueues[0] == InboundEventQueue) {
632                 InboundEventQueue = InboundEventQueues[1];
633                 q = InboundEventQueues[0];
634         }
635         else {
636                 InboundEventQueue = InboundEventQueues[0];
637                 q = InboundEventQueues[1];
638         }
639         pthread_mutex_unlock(&EventQueueMutex);
640         Now = ev_now (event_base);
641         It = GetNewHashPos(q, 0);
642         while (GetNextHashPos(q, It, &len, &Key, &v))
643         {
644                 IOAddHandler *h = v;
645                 count ++;
646                 if (h->IO->ID == 0) {
647                         h->IO->ID = EvIDSource++;
648                 }
649                 IOID = h->IO->ID;
650                 if (h->IO->StartIO == 0.0)
651                         h->IO->StartIO = Now;
652
653                 SetEVState(h->IO, eIOAttach);
654
655                 Ctx = h->IO->CitContext;
656                 become_session(Ctx);
657
658                 h->IO->Now = Now;
659                 switch (h->EvAttch(h->IO))
660                 {
661                 case eReadMore:
662                 case eReadMessage:
663                 case eReadFile:
664                 case eSendReply:
665                 case eSendMore:
666                 case eReadPayload:
667                 case eSendFile:
668                 case eDBQuery:
669                 case eSendDNSQuery:
670                 case eReadDNSReply:
671                 case eConnect:
672                         break;
673                 case eTerminateConnection:
674                 case eAbort:
675                         ShutDownCLient(h->IO);
676                 break;
677                 }
678         }
679         DeleteHashPos(&It);
680         DeleteHashContent(&q);
681         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
682 }
683
684
685 static void EventExitCallback(EV_P_ ev_async *w, int revents)
686 {
687         ev_break(event_base, EVBREAK_ALL);
688
689         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
690 }
691
692
693
694 void InitEventQueue(void)
695 {
696         pthread_mutex_init(&EventQueueMutex, NULL);
697         pthread_mutex_init(&EventExitQueueMutex, NULL);
698
699         QueueEvents = NewHash(1, Flathash);
700         InboundEventQueues[0] = NewHash(1, Flathash);
701         InboundEventQueues[1] = NewHash(1, Flathash);
702         InboundEventQueue = InboundEventQueues[0];
703 }
704 extern void CtdlDestroyEVCleanupHooks(void);
705
706 extern int EVQShutDown;
707 const char *IOLog = "IO";
708 /*
709  * this thread operates the select() etc. via libev.
710  */
711 void *client_event_thread(void *arg) 
712 {
713         struct CitContext libev_client_CC;
714
715         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
716
717         pthread_setspecific(evConKey, IOLog);
718
719         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
720
721         event_base = ev_default_loop (EVFLAG_AUTO);
722         ev_async_init(&AddJob, QueueEventAddCallback);
723         ev_async_start(event_base, &AddJob);
724         ev_async_init(&ExitEventLoop, EventExitCallback);
725         ev_async_start(event_base, &ExitEventLoop);
726         ev_async_init(&WakeupCurl, WakeupCurlCallback);
727         ev_async_start(event_base, &WakeupCurl);
728
729         curl_init_connectionpool();
730
731         ev_run (event_base, 0);
732
733         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
734
735 ///what todo here?      CtdlClearSystemContext();
736         pthread_mutex_lock(&EventExitQueueMutex);
737         ev_loop_destroy (EV_DEFAULT_UC);
738         event_base = NULL;
739         DeleteHash(&QueueEvents);
740         InboundEventQueue = NULL;
741         DeleteHash(&InboundEventQueues[0]);
742         DeleteHash(&InboundEventQueues[1]);
743 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
744         evcurl_shutdown();
745
746         CtdlDestroyEVCleanupHooks();
747
748         pthread_mutex_unlock(&EventExitQueueMutex);
749         EVQShutDown = 1;
750         return(NULL);
751 }
752
753 /*----------------------------------------------------------------------------*/
754 /*
755  * DB-Queue; does async bdb operations.
756  * has its own set of handlers.
757  */
758 ev_loop *event_db;
759 int evdb_count = 0;
760 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
761 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
762 HashList *DBQueueEvents = NULL;
763 HashList *DBInboundEventQueue = NULL;
764 HashList *DBInboundEventQueues[2] = { NULL, NULL };
765
766 ev_async DBAddJob;
767 ev_async DBExitEventLoop;
768
769 extern void ShutDownDBCLient(AsyncIO *IO);
770
771 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
772 {
773         CitContext *Ctx;
774         long IOID = -1;
775         long count = 0;;
776         ev_tstamp Now;
777         HashList *q;
778         void *v;
779         HashPos *It;
780         long len;
781         const char *Key;
782
783         /* get the control command... */
784         pthread_mutex_lock(&DBEventQueueMutex);
785
786         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
787                 DBInboundEventQueue = DBInboundEventQueues[1];
788                 q = DBInboundEventQueues[0];
789         }
790         else {
791                 DBInboundEventQueue = DBInboundEventQueues[0];
792                 q = DBInboundEventQueues[1];
793         }
794         pthread_mutex_unlock(&DBEventQueueMutex);
795
796         Now = ev_now (event_db);
797         It = GetNewHashPos(q, 0);
798         while (GetNextHashPos(q, It, &len, &Key, &v))
799         {
800                 IOAddHandler *h = v;
801                 eNextState rc;
802                 count ++;
803                 if (h->IO->ID == 0)
804                         h->IO->ID = EvIDSource++;
805                 IOID = h->IO->ID;
806                 if (h->IO->StartDB == 0.0)
807                         h->IO->StartDB = Now;
808                 h->IO->Now = Now;
809
810                 SetEVState(h->IO, eDBAttach);
811                 Ctx = h->IO->CitContext;
812                 become_session(Ctx);
813                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
814                 rc = h->EvAttch(h->IO);
815                 switch (rc)
816                 {
817                 case eAbort:
818                         ShutDownDBCLient(h->IO);
819                 default:
820                         break;
821                 }
822         }
823         DeleteHashPos(&It);
824         DeleteHashContent(&q);
825         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
826 }
827
828
829 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
830 {
831         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
832         ev_break(event_db, EVBREAK_ALL);
833 }
834
835
836
837 void DBInitEventQueue(void)
838 {
839         pthread_mutex_init(&DBEventQueueMutex, NULL);
840         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
841
842         DBQueueEvents = NewHash(1, Flathash);
843         DBInboundEventQueues[0] = NewHash(1, Flathash);
844         DBInboundEventQueues[1] = NewHash(1, Flathash);
845         DBInboundEventQueue = DBInboundEventQueues[0];
846 }
847
848 const char *DBLog = "BD";
849
850 /*
851  * this thread operates writing to the message database via libev.
852  */
853 void *db_event_thread(void *arg)
854 {
855         ev_loop *tmp;
856         struct CitContext libev_msg_CC;
857
858         pthread_setspecific(evConKey, DBLog);
859
860         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
861
862         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
863
864         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
865
866         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
867         ev_async_start(event_db, &DBAddJob);
868         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
869         ev_async_start(event_db, &DBExitEventLoop);
870
871         ev_run (event_db, 0);
872
873         pthread_mutex_lock(&DBEventExitQueueMutex);
874
875         event_db = NULL;
876         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
877
878         DeleteHash(&DBQueueEvents);
879         DBInboundEventQueue = NULL;
880         DeleteHash(&DBInboundEventQueues[0]);
881         DeleteHash(&DBInboundEventQueues[1]);
882
883 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
884
885         ev_loop_destroy (tmp);
886         pthread_mutex_unlock(&DBEventExitQueueMutex);
887         return(NULL);
888 }
889
890 void ShutDownEventQueues(void)
891 {
892         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
893
894         pthread_mutex_lock(&DBEventQueueMutex);
895         ev_async_send (event_db, &DBExitEventLoop);
896         pthread_mutex_unlock(&DBEventQueueMutex);
897
898         pthread_mutex_lock(&EventQueueMutex);
899         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
900         pthread_mutex_unlock(&EventQueueMutex);
901 }
902
903 void DebugEventloopEnable(const int n)
904 {
905         DebugEventLoop = n;
906 }
907 void DebugEventloopBacktraceEnable(const int n)
908 {
909         DebugEventLoopBacktrace = n;
910 }
911
912 void DebugCurlEnable(const int n)
913 {
914         DebugCurl = n;
915 }
916
917 const char *WLog = "WX";
918 CTDL_MODULE_INIT(event_client)
919 {
920         if (!threading)
921         {
922                 if (pthread_key_create(&evConKey, NULL) != 0) {
923                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
924                 }
925                 pthread_setspecific(evConKey, WLog);
926
927                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
928                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
929                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
930                 InitEventQueue();
931                 DBInitEventQueue();
932                 CtdlThreadCreate(client_event_thread);
933                 CtdlThreadCreate(db_event_thread);
934         }
935         return "event";
936 }