More places to remove the experimental #if
[citadel] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #include "event_client.h"
59 #include "serv_curl.h"
60
61 ev_loop *event_base;
62
63 /*****************************************************************************
64  *                   libevent / curl integration                             *
65  *****************************************************************************/
66 #define MOPT(s, v)                                                      \
67         do {                                                            \
68                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
69                 if (sta) {                                              \
70                         syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
71                         exit (1);                                       \
72                 }                                                       \
73         } while (0)
74
75 typedef struct _evcurl_global_data {
76         int magic;
77         CURLM *mhnd;
78         ev_timer timeev;
79         int nrun;
80 } evcurl_global_data;
81
82 typedef struct _sockwatcher_data 
83 {
84         evcurl_global_data *global;
85         ev_io ioev;
86 } sockwatcher_data;
87
88 ev_async WakeupCurl;
89 evcurl_global_data global;
90
91 static void
92 gotstatus(evcurl_global_data *global, int nnrun) 
93 {
94         CURLM *mhnd;
95         CURLMsg *msg;
96         int nmsg;
97 /*
98   if (EVCURL_GLOBAL_MAGIC != global.magic)
99   {
100   CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct");
101   return;
102   }
103 */
104         global->nrun = nnrun;
105         mhnd = global->mhnd;
106
107         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
108         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
109                 syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
110                 if (CURLMSG_DONE == msg->msg) {
111                         CURL *chnd;
112                         char *chandle;
113                         CURLcode sta;
114                         CURLMcode msta;
115                         AsyncIO  *IO;
116
117                         chandle = NULL;;
118                         chnd = msg->easy_handle;
119                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
120                         syslog(LOG_ERR, "EVCURL: request complete\n");
121                         if (sta)
122                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
123                         IO = (AsyncIO *)chandle;
124                         
125                         sta = msg->data.result;
126                         if (sta) {
127                                 syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
128                                 syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
129                         }
130                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
131                         if (sta)
132                                 syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
133                         syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
134                         msta = curl_multi_remove_handle(mhnd, chnd);
135                         if (msta)
136                                 syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
137
138                         IO->HttpReq.attached = 0;
139                         IO->SendDone(IO);
140                         curl_easy_cleanup(IO->HttpReq.chnd);
141                         curl_slist_free_all(IO->HttpReq.headers);
142                         FreeStrBuf(&IO->HttpReq.ReplyData);
143                         FreeURL(&IO->ConnectMe);
144                         RemoveContext(IO->CitContext);
145                         IO->Terminate(IO);
146                 }
147         }
148 }
149
150 static void
151 stepmulti(evcurl_global_data *global, curl_socket_t fd) {
152         int nnrun;
153         CURLMcode msta;
154         
155         if (global == NULL) {
156             syslog(LOG_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n");
157             return;
158         }
159         msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
160         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
161         if (msta)
162                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
163         if (global->nrun != nnrun)
164                 gotstatus(global, nnrun);
165 }
166
167 static void
168 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
169         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
170         evcurl_global_data *global = (void *)timeev->data;
171         stepmulti(global, CURL_SOCKET_TIMEOUT);
172 }
173
174 static void
175 gotio(struct ev_loop *loop, ev_io *ioev, int events) {
176         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
177         sockwatcher_data *sockwatcher = (void *)ioev->data;
178         stepmulti(sockwatcher->global, ioev->fd);
179 }
180
181 static size_t
182 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
183         AsyncIO *IO = (AsyncIO*) cglobal;
184         //evcurl_request_data *D = (evcurl_request_data*) data;
185         syslog(LOG_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n");
186
187         if (IO->HttpReq.ReplyData == NULL)
188         {
189             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
190         }
191         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
192 }
193
194 static int
195 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
196         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
197         evcurl_global_data *global = cglobal;
198         ev_timer_stop(EV_DEFAULT, &global->timeev);
199         if (tblock_ms < 0 || 14000 < tblock_ms)
200                 tblock_ms = 14000;
201         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
202         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
203         curl_multi_perform(global, CURL_POLL_NONE);
204         return 0;
205 }
206
207 static int
208 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) {
209         evcurl_global_data *global = cglobal;
210         CURLM *mhnd = global->mhnd;
211         sockwatcher_data *sockwatcher = csockwatcher;
212
213         syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
214
215         if (!sockwatcher) {
216                 syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
217                 sockwatcher = malloc(sizeof(sockwatcher_data));
218                 sockwatcher->global = global;
219                 ev_init(&sockwatcher->ioev, &gotio);
220                 sockwatcher->ioev.data = (void *)sockwatcher;
221                 curl_multi_assign(mhnd, fd, sockwatcher);
222         }
223         if (CURL_POLL_REMOVE == action) {
224                 syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
225                 ev_io_stop(event_base, &sockwatcher->ioev);
226                 free(sockwatcher);
227         } else {
228                 int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
229                 ev_io_stop(EV_DEFAULT, &sockwatcher->ioev);
230                 if (events) {
231                         ev_io_set(&sockwatcher->ioev, fd, events);
232                         ev_io_start(EV_DEFAULT, &sockwatcher->ioev);
233                 }
234         }
235         return 0;
236 }
237
238 void curl_init_connectionpool(void) 
239 {
240         CURLM *mhnd ;
241
242         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
243         global.timeev.data = (void *)&global;
244         global.nrun = -1;
245         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
246
247         if (sta) 
248         {
249                 syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
250                 exit(1);
251         }
252         mhnd = global.mhnd = curl_multi_init();
253         if (!mhnd)
254         {
255                 syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n");
256                 exit(3);
257         }
258
259         MOPT(SOCKETFUNCTION, &gotwatchsock);
260         MOPT(SOCKETDATA, (void *)&global);
261         MOPT(TIMERFUNCTION, &gotwatchtime);
262         MOPT(TIMERDATA, (void *)&global);
263
264         return;
265 }
266
267
268
269
270 int evcurl_init(AsyncIO *IO, 
271                 void *CustomData, 
272                 const char* Desc,
273                 IO_CallBack CallBack, 
274                 IO_CallBack Terminate)
275 {
276         CURLcode sta;
277         CURL *chnd;
278
279         syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
280         IO->HttpReq.attached = 0;
281         IO->SendDone = CallBack;
282         IO->Terminate = Terminate;
283         chnd = IO->HttpReq.chnd = curl_easy_init();
284         if (!chnd)
285         {
286                 syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
287                 return 1;
288         }
289
290         strcpy(IO->HttpReq.errdesc, Desc);
291
292         OPT(VERBOSE, (long)1);
293                 /* unset in production */
294         OPT(NOPROGRESS, (long)1); 
295         OPT(NOSIGNAL, (long)1);
296         OPT(FAILONERROR, (long)1);
297         OPT(ENCODING, "");
298         OPT(FOLLOWLOCATION, (long)1);
299         OPT(MAXREDIRS, (long)7);
300         OPT(USERAGENT, CITADEL);
301
302         OPT(TIMEOUT, (long)1800);
303         OPT(LOW_SPEED_LIMIT, (long)64);
304         OPT(LOW_SPEED_TIME, (long)600);
305         OPT(CONNECTTIMEOUT, (long)600); 
306         OPT(PRIVATE, (void *)IO);
307
308
309         OPT(WRITEFUNCTION, &gotdata); 
310         OPT(WRITEDATA, (void *)IO);
311         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
312
313         if (
314                 (!IsEmptyStr(config.c_ip_addr))
315                 && (strcmp(config.c_ip_addr, "*"))
316                 && (strcmp(config.c_ip_addr, "::"))
317                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
318         ) {
319                 OPT(INTERFACE, config.c_ip_addr);
320         }
321                 /* point to a structure that points back to the perl structure and stuff */
322         syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
323         OPT(URL, IO->ConnectMe->PlainUrl);
324         if (StrLength(IO->ConnectMe->CurlCreds))
325         {
326                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
327                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
328         }
329 #ifdef CURLOPT_HTTP_CONTENT_DECODING
330         OPT(HTTP_CONTENT_DECODING, 1);
331         OPT(ENCODING, "");
332 #endif
333         if (StrLength(IO->HttpReq.PostData) > 0)
334         { 
335                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
336                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
337
338         }
339         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
340         {
341                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
342                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
343         }
344
345         if (IO->HttpReq.headers != NULL)
346                 OPT(HTTPHEADER, IO->HttpReq.headers);
347
348         return 1;
349 }
350
351 void
352 evcurl_handle_start(AsyncIO *IO) 
353 {
354         CURLMcode msta;
355         
356         syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
357         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
358         if (msta)
359                 syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
360         IO->HttpReq.attached = 1;
361         ev_async_send (event_base, &WakeupCurl);
362 }
363
364 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
365 {
366         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
367
368         curl_multi_perform(&global, CURL_POLL_NONE);
369 }
370
371 static void evcurl_shutdown (void)
372 {
373         curl_multi_cleanup(global.mhnd);
374 }
375 /*****************************************************************************
376  *                       libevent integration                                *
377  *****************************************************************************/
378 /*
379  * client event queue plus its methods.
380  * this currently is the main loop (which may change in some future?)
381  */
382 int evbase_count = 0;
383 int event_add_pipe[2] = {-1, -1};
384 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
385 HashList *QueueEvents = NULL;
386 HashList *InboundEventQueue = NULL;
387 HashList *InboundEventQueues[2] = { NULL, NULL };
388
389 ev_async AddJob;   
390 ev_async ExitEventLoop;
391
392 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
393 {
394         HashList *q;
395         void *v;
396         HashPos  *It;
397         long len;
398         const char *Key;
399
400         /* get the control command... */
401         pthread_mutex_lock(&EventQueueMutex);
402
403         if (InboundEventQueues[0] == InboundEventQueue) {
404                 InboundEventQueue = InboundEventQueues[1];
405                 q = InboundEventQueues[0];
406         }
407         else {
408                 InboundEventQueue = InboundEventQueues[0];
409                 q = InboundEventQueues[1];
410         }
411         pthread_mutex_unlock(&EventQueueMutex);
412
413         It = GetNewHashPos(q, 0);
414         while (GetNextHashPos(q, It, &len, &Key, &v))
415         {
416                 IOAddHandler *h = v;
417                 h->EvAttch(h->IO);
418         }
419         DeleteHashPos(&It);
420         DeleteHashContent(&q);
421         syslog(LOG_DEBUG, "EVENT Q Read done.\n");
422 }
423
424
425 static void EventExitCallback(EV_P_ ev_async *w, int revents)
426 {
427         ev_break(event_base, EVBREAK_ALL);
428
429         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
430 }
431
432
433
434 void InitEventQueue(void)
435 {
436         struct rlimit LimitSet;
437
438         pthread_mutex_init(&EventQueueMutex, NULL);
439
440         if (pipe(event_add_pipe) != 0) {
441                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
442                 abort();
443         }
444         LimitSet.rlim_cur = 1;
445         LimitSet.rlim_max = 1;
446         setrlimit(event_add_pipe[1], &LimitSet);
447
448         QueueEvents = NewHash(1, Flathash);
449         InboundEventQueues[0] = NewHash(1, Flathash);
450         InboundEventQueues[1] = NewHash(1, Flathash);
451         InboundEventQueue = InboundEventQueues[0];
452 }
453 /*
454  * this thread operates the select() etc. via libev.
455  * 
456  * 
457  */
458 void *client_event_thread(void *arg) 
459 {
460         struct CitContext libev_client_CC;
461
462         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
463 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
464         syslog(LOG_DEBUG, "client_ev_thread() initializing\n");
465
466         event_base = ev_default_loop (EVFLAG_AUTO);
467
468         ev_async_init(&AddJob, QueueEventAddCallback);
469         ev_async_start(event_base, &AddJob);
470         ev_async_init(&ExitEventLoop, EventExitCallback);
471         ev_async_start(event_base, &ExitEventLoop);
472         ev_async_init(&WakeupCurl, WakeupCurlCallback);
473         ev_async_start(event_base, &WakeupCurl);
474
475         curl_init_connectionpool();
476
477         ev_run (event_base, 0);
478
479
480 ///what todo here?      CtdlClearSystemContext();
481         ev_loop_destroy (EV_DEFAULT_UC);
482         
483         DeleteHash(&QueueEvents);
484         InboundEventQueue = NULL;
485         DeleteHash(&InboundEventQueues[0]);
486         DeleteHash(&InboundEventQueues[1]);
487 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
488         evcurl_shutdown();
489
490         return(NULL);
491 }
492 /*------------------------------------------------------------------------------*/
493 /*
494  * DB-Queue; does async bdb operations.
495  * has its own set of handlers.
496  */
497 ev_loop *event_db;
498 int evdb_count = 0;
499 int evdb_add_pipe[2] = {-1, -1};
500 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
501 HashList *DBQueueEvents = NULL;
502 HashList *DBInboundEventQueue = NULL;
503 HashList *DBInboundEventQueues[2] = { NULL, NULL };
504
505 ev_async DBAddJob;   
506 ev_async DBExitEventLoop;
507
508 extern void ShutDownDBCLient(AsyncIO *IO);
509
510 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
511 {
512         HashList *q;
513         void *v;
514         HashPos  *It;
515         long len;
516         const char *Key;
517
518         /* get the control command... */
519         pthread_mutex_lock(&DBEventQueueMutex);
520
521         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
522                 DBInboundEventQueue = DBInboundEventQueues[1];
523                 q = DBInboundEventQueues[0];
524         }
525         else {
526                 DBInboundEventQueue = DBInboundEventQueues[0];
527                 q = DBInboundEventQueues[1];
528         }
529         pthread_mutex_unlock(&DBEventQueueMutex);
530
531         It = GetNewHashPos(q, 0);
532         while (GetNextHashPos(q, It, &len, &Key, &v))
533         {
534                 IOAddHandler *h = v;
535                 eNextState rc;
536                 rc = h->EvAttch(h->IO);
537                 switch (rc)
538                 {
539                 case eAbort:
540                     ShutDownDBCLient(h->IO);
541                 default:
542                     break;
543                 }
544         }
545         DeleteHashPos(&It);
546         DeleteHashContent(&q);
547         syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
548 }
549
550
551 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
552 {
553         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
554         ev_break(event_db, EVBREAK_ALL);
555 }
556
557
558
559 void DBInitEventQueue(void)
560 {
561         struct rlimit LimitSet;
562
563         pthread_mutex_init(&DBEventQueueMutex, NULL);
564
565         if (pipe(evdb_add_pipe) != 0) {
566                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
567                 abort();
568         }
569         LimitSet.rlim_cur = 1;
570         LimitSet.rlim_max = 1;
571         setrlimit(evdb_add_pipe[1], &LimitSet);
572
573         DBQueueEvents = NewHash(1, Flathash);
574         DBInboundEventQueues[0] = NewHash(1, Flathash);
575         DBInboundEventQueues[1] = NewHash(1, Flathash);
576         DBInboundEventQueue = DBInboundEventQueues[0];
577 }
578
579 /*
580  * this thread operates writing to the message database via libev.
581  * 
582  * 
583  */
584 void *db_event_thread(void *arg) 
585 {
586         struct CitContext libev_msg_CC;
587
588         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
589 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
590         syslog(LOG_DEBUG, "client_msgev_thread() initializing\n");
591
592         event_db = ev_loop_new (EVFLAG_AUTO);
593
594         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
595         ev_async_start(event_db, &DBAddJob);
596         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
597         ev_async_start(event_db, &DBExitEventLoop);
598
599         ev_run (event_db, 0);
600
601
602 //// what to do here?   CtdlClearSystemContext();
603         ev_loop_destroy (event_db);
604
605         DeleteHash(&DBQueueEvents);
606         DBInboundEventQueue = NULL;
607         DeleteHash(&DBInboundEventQueues[0]);
608         DeleteHash(&DBInboundEventQueues[1]);
609 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
610
611         return(NULL);
612 }
613
614 CTDL_MODULE_INIT(event_client)
615 {
616         if (!threading)
617         {
618                 InitEventQueue();
619                 DBInitEventQueue();
620                 CtdlThreadCreate(/*"Client event", */ client_event_thread);
621                 CtdlThreadCreate(/*"DB event", */db_event_thread);
622 /// todo register shutdown callback.
623         }
624         return "event";
625 }