2260446a29671af6db5cb85f81065629f7817a0f
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2012 by the citadel.org team
3  *
4  *  This program is open source software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License version 3.
6  *  
7  *  
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  
15  *  
16  *  
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <assert.h>
48 #include <arpa/inet.h>
49 #include <libcitadel.h>
50 #include <curl/curl.h>
51 #include <curl/multi.h>
52 #include "citadel.h"
53 #include "server.h"
54 #include "citserver.h"
55 #include "support.h"
56
57 #include "ctdl_module.h"
58
59 #include "event_client.h"
60 #include "serv_curl.h"
61
62 ev_loop *event_base;
63 int DebugEventLoop = 0;
64 int DebugEventLoopBacktrace = 0;
65 int DebugCurl = 0;
66 pthread_key_t evConKey;
67
68 long EvIDSource = 1;
69 /*****************************************************************************
70  *                   libevent / curl integration                             *
71  *****************************************************************************/
72 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
73
74 #define EVCURL_syslog(LEVEL, FORMAT, ...)                               \
75         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
76                               IOSTR, IO->ID, CCID, __VA_ARGS__)
77
78 #define EVCURLM_syslog(LEVEL, FORMAT)                                   \
79         DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT,    \
80                               IOSTR, IO->ID, CCID)
81
82 #define CURL_syslog(LEVEL, FORMAT, ...)                                 \
83         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
84
85 #define CURLM_syslog(LEVEL, FORMAT)                     \
86         DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
87
88 #define MOPT(s, v)                                                      \
89         do {                                                            \
90                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
91                 if (sta) {                                              \
92                         EVQ_syslog(LOG_ERR, "error setting option "     \
93                                #s " on curl multi handle: %s\n",        \
94                                curl_easy_strerror(sta));                \
95                         exit (1);                                       \
96                 }                                                       \
97         } while (0)
98
99
100 typedef struct _evcurl_global_data {
101         int magic;
102         CURLM *mhnd;
103         ev_timer timeev;
104         int nrun;
105 } evcurl_global_data;
106
107 ev_async WakeupCurl;
108 evcurl_global_data global;
109
110 eNextState QueueAnDBOperation(AsyncIO *IO);
111
112 static void
113 gotstatus(int nnrun)
114 {
115         CURLMsg *msg;
116         int nmsg;
117
118         global.nrun = nnrun;
119
120         CURLM_syslog(LOG_DEBUG,
121                      "gotstatus(): about to call curl_multi_info_read\n");
122         while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
123                 CURL_syslog(LOG_DEBUG,
124                             "got curl multi_info message msg=%d\n",
125                             msg->msg);
126
127                 if (CURLMSG_DONE == msg->msg) {
128                         CURL *chnd;
129                         void *chandle = NULL;
130                         CURLcode sta;
131                         CURLMcode msta;
132                         AsyncIO*IO;
133
134                         chandle = NULL;;
135                         chnd = msg->easy_handle;
136                         sta = curl_easy_getinfo(chnd,
137                                                 CURLINFO_PRIVATE,
138                                                 &chandle);
139                         if (sta) {
140                                 syslog(LOG_ERR,
141                                        "error asking curl for private"
142                                        " cookie of curl handle: %s\n",
143                                        curl_easy_strerror(sta));
144                                 continue;
145                         }
146                         IO = (AsyncIO *)chandle;
147                         if (IO->ID == 0) {
148                                 EVCURL_syslog(LOG_ERR,
149                                               "Error, invalid IO context %p\n",
150                                               IO);
151                                 continue;
152                         }
153                         SetEVState(IO, eCurlGotStatus);
154
155                         EVCURLM_syslog(LOG_DEBUG, "request complete\n");
156
157                         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
158
159                         ev_io_stop(event_base, &IO->recv_event);
160                         ev_io_stop(event_base, &IO->send_event);
161
162                         sta = msg->data.result;
163                         if (sta) {
164                                 EVCURL_syslog(LOG_ERR,
165                                               "error description: %s\n",
166                                               IO->HttpReq.errdesc);
167                                 IO->HttpReq.CurlError = curl_easy_strerror(sta);
168                                 EVCURL_syslog(LOG_ERR,
169                                               "error performing request: %s\n",
170                                               IO->HttpReq.CurlError);
171                                 if (sta == CURLE_OPERATION_TIMEDOUT)
172                                 {
173                                         IO->SendBuf.fd = 0;
174                                         IO->RecvBuf.fd = 0;
175                                 }
176                         }
177                         sta = curl_easy_getinfo(chnd,
178                                                 CURLINFO_RESPONSE_CODE,
179                                                 &IO->HttpReq.httpcode);
180                         if (sta)
181                                 EVCURL_syslog(LOG_ERR,
182                                               "error asking curl for "
183                                               "response code from request: %s\n",
184                                               curl_easy_strerror(sta));
185                         EVCURL_syslog(LOG_DEBUG,
186                                       "http response code was %ld\n",
187                                       (long)IO->HttpReq.httpcode);
188
189
190                         curl_slist_free_all(IO->HttpReq.headers);
191                         IO->HttpReq.headers = NULL;
192                         msta = curl_multi_remove_handle(global.mhnd, chnd);
193                         if (msta)
194                                 EVCURL_syslog(LOG_ERR,
195                                               "warning problem detaching "
196                                               "completed handle from curl multi: "
197                                               "%s\n",
198                                               curl_multi_strerror(msta));
199
200                         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
201
202                         IO->HttpReq.attached = 0;
203                         switch(IO->SendDone(IO))
204                         {
205                         case eDBQuery:
206                                 FreeURL(&IO->ConnectMe);
207                                 QueueAnDBOperation(IO);
208                                 break;
209                         case eSendDNSQuery:
210                         case eReadDNSReply:
211                         case eConnect:
212                         case eSendReply:
213                         case eSendMore:
214                         case eSendFile:
215                         case eReadMessage:
216                         case eReadMore:
217                         case eReadPayload:
218                         case eReadFile:
219                                 break;
220                         case eTerminateConnection:
221                         case eAbort:
222                                 curl_easy_cleanup(IO->HttpReq.chnd);
223                                 IO->HttpReq.chnd = NULL;
224                                 FreeStrBuf(&IO->HttpReq.ReplyData);
225                                 FreeURL(&IO->ConnectMe);
226                                 RemoveContext(IO->CitContext);
227                                 IO->Terminate(IO);
228                         }
229                 }
230         }
231 }
232
233 static void
234 stepmulti(void *data, curl_socket_t fd, int which)
235 {
236         int running_handles = 0;
237         CURLMcode msta;
238
239         msta = curl_multi_socket_action(global.mhnd,
240                                         fd,
241                                         which,
242                                         &running_handles);
243
244         CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
245         if (msta)
246                 CURL_syslog(LOG_ERR,
247                             "error in curl processing events"
248                             "on multi handle, fd %d: %s\n",
249                             (int)fd,
250                             curl_multi_strerror(msta));
251
252         if (global.nrun != running_handles)
253                 gotstatus(running_handles);
254 }
255
256 static void
257 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
258 {
259         CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
260         stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
261 }
262
263 static void
264 got_in(struct ev_loop *loop, ev_io *ioev, int events)
265 {
266         CURL_syslog(LOG_DEBUG,
267                     "EVCURL: waking up curl for io on fd %d\n",
268                     (int)ioev->fd);
269
270         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
271 }
272
273 static void
274 got_out(struct ev_loop *loop, ev_io *ioev, int events)
275 {
276         CURL_syslog(LOG_DEBUG,
277                     "waking up curl for io on fd %d\n",
278                     (int)ioev->fd);
279
280         stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
281 }
282
283 static size_t
284 gotdata(void *data, size_t size, size_t nmemb, void *cglobal)
285 {
286         AsyncIO *IO = (AsyncIO*) cglobal;
287
288         SetEVState(IO, eCurlGotData);
289         if (IO->HttpReq.ReplyData == NULL)
290         {
291                 IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
292         }
293         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
294         return CurlFillStrBuf_callback(data,
295                                        size,
296                                        nmemb,
297                                        IO->HttpReq.ReplyData);
298 }
299
300 static int
301 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
302         CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
303         evcurl_global_data *global = cglobal;
304         ev_timer_stop(EV_DEFAULT, &global->timeev);
305         if (tblock_ms < 0 || 14000 < tblock_ms)
306                 tblock_ms = 14000;
307         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
308         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
309         curl_multi_perform(global, &global->nrun);
310         return 0;
311 }
312
313 static int
314 gotwatchsock(CURL *easy,
315              curl_socket_t fd,
316              int action,
317              void *cglobal,
318              void *vIO)
319 {
320         evcurl_global_data *global = cglobal;
321         CURLM *mhnd = global->mhnd;
322         void *f;
323         AsyncIO *IO = (AsyncIO*) vIO;
324         CURLcode sta;
325         const char *Action;
326
327         if (IO == NULL) {
328                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
329                 if (sta) {
330                         CURL_syslog(LOG_ERR,
331                                     "EVCURL: error asking curl for private "
332                                     "cookie of curl handle: %s\n",
333                                     curl_easy_strerror(sta));
334                         return -1;
335                 }
336                 IO = (AsyncIO *) f;
337                 SetEVState(IO, eCurlNewIO);
338                 EVCURL_syslog(LOG_DEBUG,
339                               "EVCURL: got socket for URL: %s\n",
340                               IO->ConnectMe->PlainUrl);
341
342                 if (IO->SendBuf.fd != 0)
343                 {
344                         ev_io_stop(event_base, &IO->recv_event);
345                         ev_io_stop(event_base, &IO->send_event);
346                 }
347                 IO->SendBuf.fd = fd;
348                 ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
349                 ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
350                 curl_multi_assign(mhnd, fd, IO);
351         }
352
353         SetEVState(IO, eCurlGotIO);
354         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
355
356         Action = "";
357         switch (action)
358         {
359         case CURL_POLL_NONE:
360                 Action = "CURL_POLL_NONE";
361                 break;
362         case CURL_POLL_REMOVE:
363                 Action = "CURL_POLL_REMOVE";
364                 break;
365         case CURL_POLL_IN:
366                 Action = "CURL_POLL_IN";
367                 break;
368         case CURL_POLL_OUT:
369                 Action = "CURL_POLL_OUT";
370                 break;
371         case CURL_POLL_INOUT:
372                 Action = "CURL_POLL_INOUT";
373                 break;
374         }
375
376
377         EVCURL_syslog(LOG_DEBUG,
378                       "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
379                       (int)fd, Action, action);
380
381         switch (action)
382         {
383         case CURL_POLL_NONE:
384                 EVCURLM_syslog(LOG_DEBUG,
385                                "called first time "
386                                "to register this sockwatcker\n");
387                 break;
388         case CURL_POLL_REMOVE:
389                 EVCURLM_syslog(LOG_DEBUG,
390                                "called last time to unregister "
391                                "this sockwatcher\n");
392                 ev_io_stop(event_base, &IO->recv_event);
393                 ev_io_stop(event_base, &IO->send_event);
394                 break;
395         case CURL_POLL_IN:
396                 ev_io_start(event_base, &IO->recv_event);
397                 ev_io_stop(event_base, &IO->send_event);
398                 break;
399         case CURL_POLL_OUT:
400                 ev_io_start(event_base, &IO->send_event);
401                 ev_io_stop(event_base, &IO->recv_event);
402                 break;
403         case CURL_POLL_INOUT:
404                 ev_io_start(event_base, &IO->send_event);
405                 ev_io_start(event_base, &IO->recv_event);
406                 break;
407         }
408         return 0;
409 }
410
411 void curl_init_connectionpool(void)
412 {
413         CURLM *mhnd ;
414
415         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
416         global.timeev.data = (void *)&global;
417         global.nrun = -1;
418         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
419
420         if (sta)
421         {
422                 CURL_syslog(LOG_ERR,
423                             "error initializing curl library: %s\n",
424                             curl_easy_strerror(sta));
425
426                 exit(1);
427         }
428         mhnd = global.mhnd = curl_multi_init();
429         if (!mhnd)
430         {
431                 CURLM_syslog(LOG_ERR,
432                              "error initializing curl multi handle\n");
433                 exit(3);
434         }
435
436         MOPT(SOCKETFUNCTION, &gotwatchsock);
437         MOPT(SOCKETDATA, (void *)&global);
438         MOPT(TIMERFUNCTION, &gotwatchtime);
439         MOPT(TIMERDATA, (void *)&global);
440
441         return;
442 }
443
444 int evcurl_init(AsyncIO *IO)
445 {
446         CURLcode sta;
447         CURL *chnd;
448
449         EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
450         IO->HttpReq.attached = 0;
451         chnd = IO->HttpReq.chnd = curl_easy_init();
452         if (!chnd)
453         {
454                 EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
455                 return 0;
456         }
457
458 #if DEBUG
459         OPT(VERBOSE, (long)1);
460 #endif
461         OPT(NOPROGRESS, 1L);
462
463         OPT(NOSIGNAL, 1L);
464         OPT(FAILONERROR, (long)1);
465         OPT(ENCODING, "");
466         OPT(FOLLOWLOCATION, (long)0);
467         OPT(MAXREDIRS, (long)0);
468         OPT(USERAGENT, CITADEL);
469
470         OPT(TIMEOUT, (long)1800);
471         OPT(LOW_SPEED_LIMIT, (long)64);
472         OPT(LOW_SPEED_TIME, (long)600);
473         OPT(CONNECTTIMEOUT, (long)600);
474         OPT(PRIVATE, (void *)IO);
475
476         OPT(FORBID_REUSE, 1);
477         OPT(WRITEFUNCTION, &gotdata);
478         OPT(WRITEDATA, (void *)IO);
479         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
480
481         if ((!IsEmptyStr(config.c_ip_addr))
482                 && (strcmp(config.c_ip_addr, "*"))
483                 && (strcmp(config.c_ip_addr, "::"))
484                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
485                 )
486         {
487                 OPT(INTERFACE, config.c_ip_addr);
488         }
489
490 #ifdef CURLOPT_HTTP_CONTENT_DECODING
491         OPT(HTTP_CONTENT_DECODING, 1);
492         OPT(ENCODING, "");
493 #endif
494
495         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
496                                                 "Connection: close");
497
498         return 1;
499 }
500
501
502 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
503                                            ev_cleanup *watcher,
504                                            int revents)
505 {
506         CURLMcode msta;
507         AsyncIO *IO = watcher->data;
508
509         if (IO == NULL)
510                 return;
511
512         SetEVState(IO, eCurlShutdown);
513         IO->CitContext->lastcmd = IO->Now = ev_now(event_base);
514         EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
515
516         curl_slist_free_all(IO->HttpReq.headers);
517         IO->HttpReq.headers = NULL;
518         msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
519         if (msta)
520         {
521                 EVCURL_syslog(LOG_ERR,
522                               "EVCURL: warning problem detaching completed handle "
523                               "from curl multi: %s\n",
524                               curl_multi_strerror(msta));
525         }
526
527         curl_easy_cleanup(IO->HttpReq.chnd);
528         IO->HttpReq.chnd = NULL;
529         ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
530         ev_io_stop(event_base, &IO->recv_event);
531         ev_io_stop(event_base, &IO->send_event);
532         assert(IO->ShutdownAbort);
533         IO->ShutdownAbort(IO);
534 }
535
536 eNextState
537 evcurl_handle_start(AsyncIO *IO)
538 {
539         CURLMcode msta;
540         CURLcode sta;
541         CURL *chnd;
542
543         SetEVState(IO, eCurlStart);
544         chnd = IO->HttpReq.chnd;
545         EVCURL_syslog(LOG_DEBUG,
546                   "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
547         OPT(URL, IO->ConnectMe->PlainUrl);
548         if (StrLength(IO->ConnectMe->CurlCreds))
549         {
550                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
551                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
552         }
553         if (StrLength(IO->HttpReq.PostData) > 0)
554         {
555                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
556                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
557
558         }
559         else if ((IO->HttpReq.PlainPostDataLen != 0) &&
560                  (IO->HttpReq.PlainPostData != NULL))
561         {
562                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
563                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
564         }
565         OPT(HTTPHEADER, IO->HttpReq.headers);
566
567         IO->NextState = eConnect;
568         EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
569         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
570         if (msta)
571         {
572                 EVCURL_syslog(LOG_ERR,
573                           "EVCURL: error attaching to curl multi handle: %s\n",
574                           curl_multi_strerror(msta));
575         }
576
577         IO->HttpReq.attached = 1;
578         ev_async_send (event_base, &WakeupCurl);
579
580         ev_cleanup_init(&IO->abort_by_shutdown,
581                         IOcurl_abort_shutdown_callback);
582
583         ev_cleanup_start(event_base, &IO->abort_by_shutdown);
584
585         return eReadMessage;
586 }
587
588 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
589 {
590         CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
591
592         curl_multi_perform(&global, CURL_POLL_NONE);
593 }
594
595 static void evcurl_shutdown (void)
596 {
597         curl_global_cleanup();
598         curl_multi_cleanup(global.mhnd);
599         CURLM_syslog(LOG_DEBUG, "exiting\n");
600 }
601 /*****************************************************************************
602  *                       libevent integration                                *
603  *****************************************************************************/
604 /*
605  * client event queue plus its methods.
606  * this currently is the main loop (which may change in some future?)
607  */
608 int evbase_count = 0;
609 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
610 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
611 HashList *QueueEvents = NULL;
612 HashList *InboundEventQueue = NULL;
613 HashList *InboundEventQueues[2] = { NULL, NULL };
614 extern void ShutDownCLient(AsyncIO *IO);
615
616 ev_async AddJob;
617 ev_async ExitEventLoop;
618
619 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
620 {
621         CitContext *Ctx;
622         long IOID = -1;
623         long count = 0;
624         ev_tstamp Now;
625         HashList *q;
626         void *v;
627         HashPos*It;
628         long len;
629         const char *Key;
630
631         /* get the control command... */
632         pthread_mutex_lock(&EventQueueMutex);
633
634         if (InboundEventQueues[0] == InboundEventQueue) {
635                 InboundEventQueue = InboundEventQueues[1];
636                 q = InboundEventQueues[0];
637         }
638         else {
639                 InboundEventQueue = InboundEventQueues[0];
640                 q = InboundEventQueues[1];
641         }
642         pthread_mutex_unlock(&EventQueueMutex);
643         Now = ev_now (event_base);
644         It = GetNewHashPos(q, 0);
645         while (GetNextHashPos(q, It, &len, &Key, &v))
646         {
647                 IOAddHandler *h = v;
648                 count ++;
649                 if (h->IO->ID == 0) {
650                         h->IO->ID = EvIDSource++;
651                 }
652                 IOID = h->IO->ID;
653                 if (h->IO->StartIO == 0.0)
654                         h->IO->StartIO = Now;
655
656                 SetEVState(h->IO, eIOAttach);
657
658                 Ctx = h->IO->CitContext;
659                 become_session(Ctx);
660
661                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
662                 switch (h->EvAttch(h->IO))
663                 {
664                 case eReadMore:
665                 case eReadMessage:
666                 case eReadFile:
667                 case eSendReply:
668                 case eSendMore:
669                 case eReadPayload:
670                 case eSendFile:
671                 case eDBQuery:
672                 case eSendDNSQuery:
673                 case eReadDNSReply:
674                 case eConnect:
675                         break;
676                 case eTerminateConnection:
677                 case eAbort:
678                         ShutDownCLient(h->IO);
679                 break;
680                 }
681         }
682         DeleteHashPos(&It);
683         DeleteHashContent(&q);
684         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld  done.", IOSTR, IOID, count);
685 }
686
687
688 static void EventExitCallback(EV_P_ ev_async *w, int revents)
689 {
690         ev_break(event_base, EVBREAK_ALL);
691
692         EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
693 }
694
695
696
697 void InitEventQueue(void)
698 {
699         pthread_mutex_init(&EventQueueMutex, NULL);
700         pthread_mutex_init(&EventExitQueueMutex, NULL);
701
702         QueueEvents = NewHash(1, Flathash);
703         InboundEventQueues[0] = NewHash(1, Flathash);
704         InboundEventQueues[1] = NewHash(1, Flathash);
705         InboundEventQueue = InboundEventQueues[0];
706 }
707 extern void CtdlDestroyEVCleanupHooks(void);
708
709 extern int EVQShutDown;
710 const char *IOLog = "IO";
711 /*
712  * this thread operates the select() etc. via libev.
713  */
714 void *client_event_thread(void *arg) 
715 {
716         struct CitContext libev_client_CC;
717
718         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
719
720         pthread_setspecific(evConKey, IOLog);
721
722         EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
723
724         event_base = ev_default_loop (EVFLAG_AUTO);
725         ev_async_init(&AddJob, QueueEventAddCallback);
726         ev_async_start(event_base, &AddJob);
727         ev_async_init(&ExitEventLoop, EventExitCallback);
728         ev_async_start(event_base, &ExitEventLoop);
729         ev_async_init(&WakeupCurl, WakeupCurlCallback);
730         ev_async_start(event_base, &WakeupCurl);
731
732         curl_init_connectionpool();
733
734         ev_run (event_base, 0);
735
736         EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
737
738 ///what todo here?      CtdlClearSystemContext();
739         pthread_mutex_lock(&EventExitQueueMutex);
740         ev_loop_destroy (EV_DEFAULT_UC);
741         event_base = NULL;
742         DeleteHash(&QueueEvents);
743         InboundEventQueue = NULL;
744         DeleteHash(&InboundEventQueues[0]);
745         DeleteHash(&InboundEventQueues[1]);
746 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
747         evcurl_shutdown();
748
749         CtdlDestroyEVCleanupHooks();
750
751         pthread_mutex_unlock(&EventExitQueueMutex);
752         EVQShutDown = 1;
753         return(NULL);
754 }
755
756 /*----------------------------------------------------------------------------*/
757 /*
758  * DB-Queue; does async bdb operations.
759  * has its own set of handlers.
760  */
761 ev_loop *event_db;
762 int evdb_count = 0;
763 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
764 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
765 HashList *DBQueueEvents = NULL;
766 HashList *DBInboundEventQueue = NULL;
767 HashList *DBInboundEventQueues[2] = { NULL, NULL };
768
769 ev_async DBAddJob;
770 ev_async DBExitEventLoop;
771
772 extern void ShutDownDBCLient(AsyncIO *IO);
773
774 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
775 {
776         CitContext *Ctx;
777         long IOID = -1;
778         long count = 0;;
779         ev_tstamp Now;
780         HashList *q;
781         void *v;
782         HashPos *It;
783         long len;
784         const char *Key;
785
786         /* get the control command... */
787         pthread_mutex_lock(&DBEventQueueMutex);
788
789         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
790                 DBInboundEventQueue = DBInboundEventQueues[1];
791                 q = DBInboundEventQueues[0];
792         }
793         else {
794                 DBInboundEventQueue = DBInboundEventQueues[0];
795                 q = DBInboundEventQueues[1];
796         }
797         pthread_mutex_unlock(&DBEventQueueMutex);
798
799         Now = ev_now (event_db);
800         It = GetNewHashPos(q, 0);
801         while (GetNextHashPos(q, It, &len, &Key, &v))
802         {
803                 IOAddHandler *h = v;
804                 eNextState rc;
805                 count ++;
806                 if (h->IO->ID == 0)
807                         h->IO->ID = EvIDSource++;
808                 IOID = h->IO->ID;
809                 if (h->IO->StartDB == 0.0)
810                         h->IO->StartDB = Now;
811                 h->IO->CitContext->lastcmd = h->IO->Now = Now;
812
813                 SetEVState(h->IO, eDBAttach);
814                 Ctx = h->IO->CitContext;
815                 become_session(Ctx);
816                 ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
817                 rc = h->EvAttch(h->IO);
818                 switch (rc)
819                 {
820                 case eAbort:
821                         ShutDownDBCLient(h->IO);
822                 default:
823                         break;
824                 }
825         }
826         DeleteHashPos(&It);
827         DeleteHashContent(&q);
828         EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count);
829 }
830
831
832 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
833 {
834         EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
835         ev_break(event_db, EVBREAK_ALL);
836 }
837
838
839
840 void DBInitEventQueue(void)
841 {
842         pthread_mutex_init(&DBEventQueueMutex, NULL);
843         pthread_mutex_init(&DBEventExitQueueMutex, NULL);
844
845         DBQueueEvents = NewHash(1, Flathash);
846         DBInboundEventQueues[0] = NewHash(1, Flathash);
847         DBInboundEventQueues[1] = NewHash(1, Flathash);
848         DBInboundEventQueue = DBInboundEventQueues[0];
849 }
850
851 const char *DBLog = "BD";
852
853 /*
854  * this thread operates writing to the message database via libev.
855  */
856 void *db_event_thread(void *arg)
857 {
858         ev_loop *tmp;
859         struct CitContext libev_msg_CC;
860
861         pthread_setspecific(evConKey, DBLog);
862
863         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
864
865         EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
866
867         tmp = event_db = ev_loop_new (EVFLAG_AUTO);
868
869         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
870         ev_async_start(event_db, &DBAddJob);
871         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
872         ev_async_start(event_db, &DBExitEventLoop);
873
874         ev_run (event_db, 0);
875
876         pthread_mutex_lock(&DBEventExitQueueMutex);
877
878         event_db = NULL;
879         EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
880
881         DeleteHash(&DBQueueEvents);
882         DBInboundEventQueue = NULL;
883         DeleteHash(&DBInboundEventQueues[0]);
884         DeleteHash(&DBInboundEventQueues[1]);
885
886 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
887
888         ev_loop_destroy (tmp);
889         pthread_mutex_unlock(&DBEventExitQueueMutex);
890         return(NULL);
891 }
892
893 void ShutDownEventQueues(void)
894 {
895         EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
896
897         pthread_mutex_lock(&DBEventQueueMutex);
898         ev_async_send (event_db, &DBExitEventLoop);
899         pthread_mutex_unlock(&DBEventQueueMutex);
900
901         pthread_mutex_lock(&EventQueueMutex);
902         ev_async_send (EV_DEFAULT_ &ExitEventLoop);
903         pthread_mutex_unlock(&EventQueueMutex);
904 }
905
906 void DebugEventloopEnable(const int n)
907 {
908         DebugEventLoop = n;
909 }
910 void DebugEventloopBacktraceEnable(const int n)
911 {
912         DebugEventLoopBacktrace = n;
913 }
914
915 void DebugCurlEnable(const int n)
916 {
917         DebugCurl = n;
918 }
919
920 const char *WLog = "WX";
921 CTDL_MODULE_INIT(event_client)
922 {
923         if (!threading)
924         {
925                 if (pthread_key_create(&evConKey, NULL) != 0) {
926                         syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno));
927                 }
928                 pthread_setspecific(evConKey, WLog);
929
930                 CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
931                 CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
932                 CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
933                 InitEventQueue();
934                 DBInitEventQueue();
935                 CtdlThreadCreate(client_event_thread);
936                 CtdlThreadCreate(db_event_thread);
937         }
938         return "event";
939 }