free curl resources in the proper sequence.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #include "event_client.h"
59 #include "serv_curl.h"
60
61 ev_loop *event_base;
62
63 /*****************************************************************************
64  *                   libevent / curl integration                             *
65  *****************************************************************************/
66 #define MOPT(s, v)                                                      \
67         do {                                                            \
68                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
69                 if (sta) {                                              \
70                         syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
71                         exit (1);                                       \
72                 }                                                       \
73         } while (0)
74
75 typedef struct _evcurl_global_data {
76         int magic;
77         CURLM *mhnd;
78         ev_timer timeev;
79         int nrun;
80 } evcurl_global_data;
81
82 ev_async WakeupCurl;
83 evcurl_global_data global;
84
85 static void
86 gotstatus(evcurl_global_data *global, int nnrun) 
87 {
88         CURLM *mhnd;
89         CURLMsg *msg;
90         int nmsg;
91
92         global->nrun = nnrun;
93         mhnd = global->mhnd;
94
95         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
96         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
97                 syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
98                 if (CURLMSG_DONE == msg->msg) {
99                         CURL *chnd;
100                         char *chandle;
101                         CURLcode sta;
102                         CURLMcode msta;
103                         AsyncIO  *IO;
104
105                         chandle = NULL;;
106                         chnd = msg->easy_handle;
107                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
108                         syslog(LOG_ERR, "EVCURL: request complete\n");
109                         if (sta)
110                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
111                         IO = (AsyncIO *)chandle;
112                         
113 /////                        ev_io_stop(event_base, &IO->recv_event);
114
115                         sta = msg->data.result;
116                         if (sta) {
117                                 syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
118                                 syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
119                         }
120                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
121                         if (sta)
122                                 syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
123                         syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
124
125
126                         curl_slist_free_all(IO->HttpReq.headers);
127                         msta = curl_multi_remove_handle(mhnd, chnd);
128                         if (msta)
129                                 syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
130
131                         curl_easy_cleanup(IO->HttpReq.chnd);
132                         IO->HttpReq.chnd = NULL;
133
134                         IO->HttpReq.attached = 0;
135                         IO->SendDone(IO);
136
137                         FreeStrBuf(&IO->HttpReq.ReplyData);
138                         FreeURL(&IO->ConnectMe);
139                         RemoveContext(IO->CitContext);
140                         IO->Terminate(IO);
141                 }
142         }
143 }
144
145 static void
146 stepmulti(evcurl_global_data *global, curl_socket_t fd) {
147         int nnrun;
148         CURLMcode msta;
149         
150         if (global == NULL) {
151             syslog(LOG_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n");
152             return;
153         }
154         msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
155         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
156         if (msta)
157                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
158         if (global->nrun != nnrun)
159                 gotstatus(global, nnrun);
160 }
161
162 static void
163 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
164         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
165         evcurl_global_data *global = (void *)timeev->data;
166         stepmulti(global, CURL_SOCKET_TIMEOUT);
167 }
168
169 static void
170 gotio(struct ev_loop *loop, ev_io *ioev, int events) {
171         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
172         stepmulti(&global, ioev->fd);
173 }
174
175 static size_t
176 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
177         AsyncIO *IO = (AsyncIO*) cglobal;
178
179         if (IO->HttpReq.ReplyData == NULL)
180         {
181             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
182         }
183         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
184 }
185
186 static int
187 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
188         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
189         evcurl_global_data *global = cglobal;
190         ev_timer_stop(EV_DEFAULT, &global->timeev);
191         if (tblock_ms < 0 || 14000 < tblock_ms)
192                 tblock_ms = 14000;
193         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
194         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
195         curl_multi_perform(global, CURL_POLL_NONE);
196         return 0;
197 }
198
199 static int
200 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vio) {
201         evcurl_global_data *global = cglobal;
202         CURLM *mhnd = global->mhnd;
203         char *f;
204         AsyncIO *IO = (AsyncIO*) vio;
205         CURLcode sta;
206
207         syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
208
209         if (IO == NULL) {
210                 syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
211                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
212                 if (sta) {
213                         syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
214                         return -1;
215                 }
216                 IO = (AsyncIO *) f;
217                 ev_init(&IO->recv_event, &gotio);
218                 curl_multi_assign(mhnd, fd, IO);
219         }
220         if (action == CURL_POLL_REMOVE) {
221                 syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
222                 ev_io_stop(event_base, &IO->recv_event);
223         } else {
224                 int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
225                 ev_io_stop(event_base, &IO->recv_event);
226                 if (events) {
227                         ev_io_set(&IO->recv_event, fd, events);
228                         ev_io_start(event_base, &IO->recv_event);
229                 }
230         }
231         return 0;
232 }
233
234 void curl_init_connectionpool(void) 
235 {
236         CURLM *mhnd ;
237
238         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
239         global.timeev.data = (void *)&global;
240         global.nrun = -1;
241         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
242
243         if (sta) 
244         {
245                 syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
246                 exit(1);
247         }
248         mhnd = global.mhnd = curl_multi_init();
249         if (!mhnd)
250         {
251                 syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n");
252                 exit(3);
253         }
254
255         MOPT(SOCKETFUNCTION, &gotwatchsock);
256         MOPT(SOCKETDATA, (void *)&global);
257         MOPT(TIMERFUNCTION, &gotwatchtime);
258         MOPT(TIMERDATA, (void *)&global);
259
260         return;
261 }
262
263
264
265
266 int evcurl_init(AsyncIO *IO, 
267                 void *CustomData, 
268                 const char* Desc,
269                 IO_CallBack CallBack, 
270                 IO_CallBack Terminate)
271 {
272         CURLcode sta;
273         CURL *chnd;
274
275         syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
276         IO->HttpReq.attached = 0;
277         IO->SendDone = CallBack;
278         IO->Terminate = Terminate;
279         chnd = IO->HttpReq.chnd = curl_easy_init();
280         if (!chnd)
281         {
282                 syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
283                 return 1;
284         }
285
286         strcpy(IO->HttpReq.errdesc, Desc);
287
288         OPT(VERBOSE, (long)1);
289                 /* unset in production */
290         OPT(NOPROGRESS, (long)1); 
291         OPT(NOSIGNAL, (long)1);
292         OPT(FAILONERROR, (long)1);
293         OPT(ENCODING, "");
294         OPT(FOLLOWLOCATION, (long)1);
295         OPT(MAXREDIRS, (long)7);
296         OPT(USERAGENT, CITADEL);
297
298         OPT(TIMEOUT, (long)1800);
299         OPT(LOW_SPEED_LIMIT, (long)64);
300         OPT(LOW_SPEED_TIME, (long)600);
301         OPT(CONNECTTIMEOUT, (long)600); 
302         OPT(PRIVATE, (void *)IO);
303
304         OPT(FORBID_REUSE, 1);
305         OPT(WRITEFUNCTION, &gotdata); 
306         OPT(WRITEDATA, (void *)IO);
307         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
308
309         if (
310                 (!IsEmptyStr(config.c_ip_addr))
311                 && (strcmp(config.c_ip_addr, "*"))
312                 && (strcmp(config.c_ip_addr, "::"))
313                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
314         ) {
315                 OPT(INTERFACE, config.c_ip_addr);
316         }
317                 /* point to a structure that points back to the perl structure and stuff */
318         syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
319         OPT(URL, IO->ConnectMe->PlainUrl);
320         if (StrLength(IO->ConnectMe->CurlCreds))
321         {
322                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
323                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
324         }
325 #ifdef CURLOPT_HTTP_CONTENT_DECODING
326         OPT(HTTP_CONTENT_DECODING, 1);
327         OPT(ENCODING, "");
328 #endif
329         if (StrLength(IO->HttpReq.PostData) > 0)
330         { 
331                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
332                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
333
334         }
335         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
336         {
337                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
338                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
339         }
340
341         IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers, "Connection: close");
342         OPT(HTTPHEADER, IO->HttpReq.headers);
343
344         return 1;
345 }
346
347 eNextState
348 evcurl_handle_start(AsyncIO *IO) 
349 {
350         CURLMcode msta;
351         
352         syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
353         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
354         if (msta)
355                 syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
356         IO->HttpReq.attached = 1;
357         ev_async_send (event_base, &WakeupCurl);
358         return eReadMessage;
359 }
360
361 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
362 {
363         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
364
365         curl_multi_perform(&global, CURL_POLL_NONE);
366 }
367
368 static void evcurl_shutdown (void)
369 {
370         curl_multi_cleanup(global.mhnd);
371 }
372 /*****************************************************************************
373  *                       libevent integration                                *
374  *****************************************************************************/
375 /*
376  * client event queue plus its methods.
377  * this currently is the main loop (which may change in some future?)
378  */
379 int evbase_count = 0;
380 int event_add_pipe[2] = {-1, -1};
381 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
382 HashList *QueueEvents = NULL;
383 HashList *InboundEventQueue = NULL;
384 HashList *InboundEventQueues[2] = { NULL, NULL };
385
386 ev_async AddJob;   
387 ev_async ExitEventLoop;
388
389 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
390 {
391         HashList *q;
392         void *v;
393         HashPos  *It;
394         long len;
395         const char *Key;
396
397         /* get the control command... */
398         pthread_mutex_lock(&EventQueueMutex);
399
400         if (InboundEventQueues[0] == InboundEventQueue) {
401                 InboundEventQueue = InboundEventQueues[1];
402                 q = InboundEventQueues[0];
403         }
404         else {
405                 InboundEventQueue = InboundEventQueues[0];
406                 q = InboundEventQueues[1];
407         }
408         pthread_mutex_unlock(&EventQueueMutex);
409
410         It = GetNewHashPos(q, 0);
411         while (GetNextHashPos(q, It, &len, &Key, &v))
412         {
413                 IOAddHandler *h = v;
414                 h->EvAttch(h->IO);
415         }
416         DeleteHashPos(&It);
417         DeleteHashContent(&q);
418         syslog(LOG_DEBUG, "EVENT Q Read done.\n");
419 }
420
421
422 static void EventExitCallback(EV_P_ ev_async *w, int revents)
423 {
424         ev_break(event_base, EVBREAK_ALL);
425
426         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
427 }
428
429
430
431 void InitEventQueue(void)
432 {
433         struct rlimit LimitSet;
434
435         pthread_mutex_init(&EventQueueMutex, NULL);
436
437         if (pipe(event_add_pipe) != 0) {
438                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
439                 abort();
440         }
441         LimitSet.rlim_cur = 1;
442         LimitSet.rlim_max = 1;
443         setrlimit(event_add_pipe[1], &LimitSet);
444
445         QueueEvents = NewHash(1, Flathash);
446         InboundEventQueues[0] = NewHash(1, Flathash);
447         InboundEventQueues[1] = NewHash(1, Flathash);
448         InboundEventQueue = InboundEventQueues[0];
449 }
450 /*
451  * this thread operates the select() etc. via libev.
452  * 
453  * 
454  */
455 void *client_event_thread(void *arg) 
456 {
457         struct CitContext libev_client_CC;
458
459         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
460 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
461         syslog(LOG_DEBUG, "client_ev_thread() initializing\n");
462
463         event_base = ev_default_loop (EVFLAG_AUTO);
464
465         ev_async_init(&AddJob, QueueEventAddCallback);
466         ev_async_start(event_base, &AddJob);
467         ev_async_init(&ExitEventLoop, EventExitCallback);
468         ev_async_start(event_base, &ExitEventLoop);
469         ev_async_init(&WakeupCurl, WakeupCurlCallback);
470         ev_async_start(event_base, &WakeupCurl);
471
472         curl_init_connectionpool();
473
474         ev_run (event_base, 0);
475
476
477 ///what todo here?      CtdlClearSystemContext();
478         ev_loop_destroy (EV_DEFAULT_UC);
479         
480         DeleteHash(&QueueEvents);
481         InboundEventQueue = NULL;
482         DeleteHash(&InboundEventQueues[0]);
483         DeleteHash(&InboundEventQueues[1]);
484 /*      citthread_mutex_destroy(&EventQueueMutex); TODO */
485         evcurl_shutdown();
486
487         return(NULL);
488 }
489 /*------------------------------------------------------------------------------*/
490 /*
491  * DB-Queue; does async bdb operations.
492  * has its own set of handlers.
493  */
494 ev_loop *event_db;
495 int evdb_count = 0;
496 int evdb_add_pipe[2] = {-1, -1};
497 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
498 HashList *DBQueueEvents = NULL;
499 HashList *DBInboundEventQueue = NULL;
500 HashList *DBInboundEventQueues[2] = { NULL, NULL };
501
502 ev_async DBAddJob;   
503 ev_async DBExitEventLoop;
504
505 extern void ShutDownDBCLient(AsyncIO *IO);
506
507 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
508 {
509         HashList *q;
510         void *v;
511         HashPos  *It;
512         long len;
513         const char *Key;
514
515         /* get the control command... */
516         pthread_mutex_lock(&DBEventQueueMutex);
517
518         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
519                 DBInboundEventQueue = DBInboundEventQueues[1];
520                 q = DBInboundEventQueues[0];
521         }
522         else {
523                 DBInboundEventQueue = DBInboundEventQueues[0];
524                 q = DBInboundEventQueues[1];
525         }
526         pthread_mutex_unlock(&DBEventQueueMutex);
527
528         It = GetNewHashPos(q, 0);
529         while (GetNextHashPos(q, It, &len, &Key, &v))
530         {
531                 IOAddHandler *h = v;
532                 eNextState rc;
533                 rc = h->EvAttch(h->IO);
534                 switch (rc)
535                 {
536                 case eAbort:
537                     ShutDownDBCLient(h->IO);
538                 default:
539                     break;
540                 }
541         }
542         DeleteHashPos(&It);
543         DeleteHashContent(&q);
544         syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
545 }
546
547
548 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
549 {
550         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
551         ev_break(event_db, EVBREAK_ALL);
552 }
553
554
555
556 void DBInitEventQueue(void)
557 {
558         struct rlimit LimitSet;
559
560         pthread_mutex_init(&DBEventQueueMutex, NULL);
561
562         if (pipe(evdb_add_pipe) != 0) {
563                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
564                 abort();
565         }
566         LimitSet.rlim_cur = 1;
567         LimitSet.rlim_max = 1;
568         setrlimit(evdb_add_pipe[1], &LimitSet);
569
570         DBQueueEvents = NewHash(1, Flathash);
571         DBInboundEventQueues[0] = NewHash(1, Flathash);
572         DBInboundEventQueues[1] = NewHash(1, Flathash);
573         DBInboundEventQueue = DBInboundEventQueues[0];
574 }
575
576 /*
577  * this thread operates writing to the message database via libev.
578  * 
579  * 
580  */
581 void *db_event_thread(void *arg) 
582 {
583         struct CitContext libev_msg_CC;
584
585         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
586 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
587         syslog(LOG_DEBUG, "client_msgev_thread() initializing\n");
588
589         event_db = ev_loop_new (EVFLAG_AUTO);
590
591         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
592         ev_async_start(event_db, &DBAddJob);
593         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
594         ev_async_start(event_db, &DBExitEventLoop);
595
596         ev_run (event_db, 0);
597
598
599 //// what to do here?   CtdlClearSystemContext();
600         ev_loop_destroy (event_db);
601
602         DeleteHash(&DBQueueEvents);
603         DBInboundEventQueue = NULL;
604         DeleteHash(&DBInboundEventQueues[0]);
605         DeleteHash(&DBInboundEventQueues[1]);
606 /*      citthread_mutex_destroy(&DBEventQueueMutex); TODO */
607
608         return(NULL);
609 }
610
611 CTDL_MODULE_INIT(event_client)
612 {
613         if (!threading)
614         {
615                 InitEventQueue();
616                 DBInitEventQueue();
617                 CtdlThreadCreate(/*"Client event", */ client_event_thread);
618                 CtdlThreadCreate(/*"DB event", */db_event_thread);
619 /// todo register shutdown callback.
620         }
621         return "event";
622 }