if specified, use our bind-ip for the http clients.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
59
60 #include "event_client.h"
61 #include "serv_curl.h"
62
63 int event_add_pipe[2] = {-1, -1};
64
65 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
66 HashList *QueueEvents = NULL;
67
68 HashList *InboundEventQueue = NULL;
69 HashList *InboundEventQueues[2] = { NULL, NULL };
70
71 struct ev_loop *event_base;
72
73 ev_async AddJob;   
74 ev_async ExitEventLoop;
75 ev_async WakeupCurl;
76
77 extern struct ev_loop *event_base;
78
79 void SockStateCb(void *data, int sock, int read, int write);
80
81 #define MOPT(s, v)                                                      \
82         do {                                                            \
83                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
84                 if (sta) {                                              \
85                         CtdlLogPrintf(CTDL_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
86                         exit (1);                                       \
87                 }                                                       \
88         } while (0)
89
90
91
92 /*****************************************************************************
93  *                   libevent / curl integration                             *
94  *****************************************************************************/
95 typedef struct _evcurl_global_data {
96         int magic;
97         CURLM *mhnd;
98         ev_timer timeev;
99         int nrun;
100 } evcurl_global_data;
101
102 typedef struct _sockwatcher_data 
103 {
104         evcurl_global_data *global;
105         ev_io ioev;
106 } sockwatcher_data;
107
108 evcurl_global_data global;
109
110 static void
111 gotstatus(evcurl_global_data *global, int nnrun) 
112 {
113         CURLM *mhnd;
114         CURLMsg *msg;
115         int nmsg;
116 /*
117   if (EVCURL_GLOBAL_MAGIC != global.magic)
118   {
119   CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct");
120   return;
121   }
122 */
123         global->nrun = nnrun;
124         mhnd = global->mhnd;
125
126         CtdlLogPrintf(CTDL_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
127         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
128                 CtdlLogPrintf(CTDL_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
129                 if (CURLMSG_DONE == msg->msg) {
130                         CURL *chnd;
131                         char *chandle;
132                         CURLcode sta;
133                         CURLMcode msta;
134                         AsyncIO  *IO;
135
136                         chandle = NULL;;
137                         chnd = msg->easy_handle;
138                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
139                         CtdlLogPrintf(CTDL_ERR, "EVCURL: request complete\n");
140                         if (sta)
141                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
142                         IO = (AsyncIO *)chandle;
143                         
144                         sta = msg->data.result;
145                         if (sta) {
146                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
147                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
148                         }
149                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
150                         if (sta)
151                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
152                         CtdlLogPrintf(CTDL_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
153                         msta = curl_multi_remove_handle(mhnd, chnd);
154                         if (msta)
155                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
156
157                         IO->HttpReq.attached = 0;
158                         IO->SendDone(IO);
159                         curl_easy_cleanup(IO->HttpReq.chnd);
160                         curl_slist_free_all(IO->HttpReq.headers);
161                         FreeStrBuf(&IO->HttpReq.ReplyData);
162                         FreeURL(&IO->ConnectMe);
163                         RemoveContext(IO->CitContext);
164                         free(IO);
165                 }
166         }
167 }
168
169 static void
170 stepmulti(evcurl_global_data *global, curl_socket_t fd) {
171         int nnrun;
172         CURLMcode msta;
173         
174         if (global == NULL) {
175             CtdlLogPrintf(CTDL_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n");
176             return;
177         }
178         msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
179         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
180         if (msta)
181                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
182         if (global->nrun != nnrun)
183                 gotstatus(global, nnrun);
184 }
185
186 static void
187 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
188         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for timeout\n");
189         evcurl_global_data *global = (void *)timeev->data;
190         stepmulti(global, CURL_SOCKET_TIMEOUT);
191 }
192
193 static void
194 gotio(struct ev_loop *loop, ev_io *ioev, int events) {
195         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
196         sockwatcher_data *sockwatcher = (void *)ioev->data;
197         stepmulti(sockwatcher->global, ioev->fd);
198 }
199
200 static size_t
201 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
202         AsyncIO *IO = (AsyncIO*) cglobal;
203         //evcurl_request_data *D = (evcurl_request_data*) data;
204         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n");
205
206         if (IO->HttpReq.ReplyData == NULL)
207         {
208             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
209         }
210         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
211 }
212
213 static int
214 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
215         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
216         evcurl_global_data *global = cglobal;
217         ev_timer_stop(EV_DEFAULT, &global->timeev);
218         if (tblock_ms < 0 || 14000 < tblock_ms)
219                 tblock_ms = 14000;
220         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
221         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
222         curl_multi_perform(global, CURL_POLL_NONE);
223         return 0;
224 }
225
226 static int
227 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) {
228         evcurl_global_data *global = cglobal;
229         CURLM *mhnd = global->mhnd;
230         sockwatcher_data *sockwatcher = csockwatcher;
231
232         CtdlLogPrintf(CTDL_DEBUG,"EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
233
234         if (!sockwatcher) {
235                 CtdlLogPrintf(CTDL_ERR,"EVCURL: called first time to register this sockwatcker\n");
236                 sockwatcher = malloc(sizeof(sockwatcher_data));
237                 sockwatcher->global = global;
238                 ev_init(&sockwatcher->ioev, &gotio);
239                 sockwatcher->ioev.data = (void *)sockwatcher;
240                 curl_multi_assign(mhnd, fd, sockwatcher);
241         }
242         if (CURL_POLL_REMOVE == action) {
243                 CtdlLogPrintf(CTDL_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
244                 ev_io_stop(event_base, &sockwatcher->ioev);
245                 free(sockwatcher);
246         } else {
247                 int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
248                 ev_io_stop(EV_DEFAULT, &sockwatcher->ioev);
249                 if (events) {
250                         ev_io_set(&sockwatcher->ioev, fd, events);
251                         ev_io_start(EV_DEFAULT, &sockwatcher->ioev);
252                 }
253         }
254         return 0;
255 }
256
257 void curl_init_connectionpool(void) 
258 {
259         CURLM *mhnd ;
260 //      global.magic = EVCURL_GLOBAL_MAGIC;
261
262         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
263         global.timeev.data = (void *)&global;
264         global.nrun = -1;
265         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
266
267         if (sta) 
268         {
269                 CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
270                 exit(1);
271         }
272 /*
273   if (!ev_default_loop(EVFLAG_AUTO))
274   {
275   CtdlLogPrintf(CTDL_ERR,"error initializing libev\n");
276   exit(2);
277   }
278 */
279         mhnd = global.mhnd = curl_multi_init();
280         if (!mhnd)
281         {
282                 CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl multi handle\n");
283                 exit(3);
284         }
285
286         MOPT(SOCKETFUNCTION, &gotwatchsock);
287         MOPT(SOCKETDATA, (void *)&global);
288         MOPT(TIMERFUNCTION, &gotwatchtime);
289         MOPT(TIMERDATA, (void *)&global);
290
291         /* well, just there to fire the sample request?*/
292 ///     ev_timer_start(EV_DEFAULT, &global.timeev);
293         return;
294 }
295
296
297
298
299 int evcurl_init(AsyncIO *IO, 
300                 void *CustomData, 
301                 const char* Desc,
302                 IO_CallBack CallBack) 
303 {
304         CURLcode sta;
305         CURL *chnd;
306
307         CtdlLogPrintf(CTDL_DEBUG,"EVCURL: evcurl_init called ms\n");
308         IO->HttpReq.attached = 0;
309         IO->SendDone = CallBack;
310         chnd = IO->HttpReq.chnd = curl_easy_init();
311         if (!chnd)
312         {
313                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error initializing curl handle\n");
314                 return 1;
315         }
316
317         strcpy(IO->HttpReq.errdesc, Desc);
318
319         OPT(VERBOSE, (long)1);
320                 /* unset in production */
321         OPT(NOPROGRESS, (long)1); 
322         OPT(NOSIGNAL, (long)1);
323         OPT(FAILONERROR, (long)1);
324         OPT(ENCODING, "");
325         OPT(FOLLOWLOCATION, (long)1);
326         OPT(MAXREDIRS, (long)7);
327         OPT(USERAGENT, CITADEL);
328
329         OPT(TIMEOUT, (long)1800);
330         OPT(LOW_SPEED_LIMIT, (long)64);
331         OPT(LOW_SPEED_TIME, (long)600);
332         OPT(CONNECTTIMEOUT, (long)600); 
333         OPT(PRIVATE, (void *)IO);
334
335
336         OPT(WRITEFUNCTION, &gotdata); 
337         OPT(WRITEDATA, (void *)IO);
338         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
339
340         if (
341                 (!IsEmptyStr(config.c_ip_addr))
342                 && (strcmp(config.c_ip_addr, "*"))
343                 && (strcmp(config.c_ip_addr, "::"))
344                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
345         ) {
346                 OPT(INTERFACE, config.c_ip_addr);
347         }
348                 /* point to a structure that points back to the perl structure and stuff */
349         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
350         OPT(URL, IO->ConnectMe->PlainUrl);
351         if (StrLength(IO->ConnectMe->CurlCreds))
352         {
353                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
354                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
355         }
356 #ifdef CURLOPT_HTTP_CONTENT_DECODING
357         OPT(HTTP_CONTENT_DECODING, 1);
358         OPT(ENCODING, "");
359 #endif
360         if (StrLength(IO->HttpReq.PostData) > 0)
361         { 
362                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
363                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
364
365         }
366         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
367         {
368                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
369                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
370         }
371
372         if (IO->HttpReq.headers != NULL)
373                 OPT(HTTPHEADER, IO->HttpReq.headers);
374
375         return 1;
376 }
377
378 void
379 evcurl_handle_start(AsyncIO *IO) 
380 {
381         CURLMcode msta;
382         
383         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: attaching to curl multi handle\n");
384         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
385         if (msta)
386                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
387         IO->HttpReq.attached = 1;
388 //      ev_timer_start(EV_DEFAULT, &global.timeev);
389         ev_async_send (event_base, &WakeupCurl);
390 }
391
392 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
393 {
394 ///     evcurl_global_data *global = cglobal;
395
396         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl multi handle\n");
397
398         curl_multi_perform(&global, CURL_POLL_NONE);
399 }
400
401 static void evcurl_shutdown (void)
402 {
403         curl_multi_cleanup(global.mhnd);
404 }
405 /*****************************************************************************
406  *                       libevent integration                                *
407  *****************************************************************************/
408
409
410 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
411 {
412         HashList *q;
413         void *v;
414         HashPos  *It;
415         long len;
416         const char *Key;
417
418         /* get the control command... */
419         citthread_mutex_lock(&EventQueueMutex);
420
421         if (InboundEventQueues[0] == InboundEventQueue) {
422                 InboundEventQueue = InboundEventQueues[1];
423                 q = InboundEventQueues[0];
424         }
425         else {
426                 InboundEventQueue = InboundEventQueues[0];
427                 q = InboundEventQueues[1];
428         }
429         citthread_mutex_unlock(&EventQueueMutex);
430
431         It = GetNewHashPos(q, 0);
432         while (GetNextHashPos(q, It, &len, &Key, &v))
433         {
434                 IOAddHandler *h = v;
435                 h->EvAttch(h->IO);
436         }
437         DeleteHashPos(&It);
438         DeleteHashContent(&q);
439         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
440 }
441
442
443 static void EventExitCallback(EV_P_ ev_async *w, int revents)
444 {
445         ev_unloop(event_base, EVUNLOOP_ALL);
446
447         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
448 }
449
450
451
452 void InitEventQueue(void)
453 {
454         struct rlimit LimitSet;
455
456         citthread_mutex_init(&EventQueueMutex, NULL);
457
458         if (pipe(event_add_pipe) != 0) {
459                 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
460                 abort();
461         }
462         LimitSet.rlim_cur = 1;
463         LimitSet.rlim_max = 1;
464         setrlimit(event_add_pipe[1], &LimitSet);
465
466         QueueEvents = NewHash(1, Flathash);
467         InboundEventQueues[0] = NewHash(1, Flathash);
468         InboundEventQueues[1] = NewHash(1, Flathash);
469         InboundEventQueue = InboundEventQueues[0];
470 }
471 /*
472  * this thread operates the select() etc. via libev.
473  * 
474  * 
475  */
476 void *client_event_thread(void *arg) 
477 {
478         struct CitContext libevent_client_CC;
479
480         CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread");
481 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
482         CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n");
483
484         event_base = ev_default_loop (EVFLAG_AUTO);
485
486         ev_async_init(&AddJob, QueueEventAddCallback);
487         ev_async_start(event_base, &AddJob);
488         ev_async_init(&ExitEventLoop, EventExitCallback);
489         ev_async_start(event_base, &ExitEventLoop);
490         ev_async_init(&WakeupCurl, WakeupCurlCallback);
491         ev_async_start(event_base, &WakeupCurl);
492
493         curl_init_connectionpool();
494
495         ev_loop (event_base, 0);
496
497
498         CtdlClearSystemContext();
499         ev_default_destroy ();
500         
501         DeleteHash(&QueueEvents);
502         InboundEventQueue = NULL;
503         DeleteHash(&InboundEventQueues[0]);
504         DeleteHash(&InboundEventQueues[1]);
505         citthread_mutex_destroy(&EventQueueMutex);
506         evcurl_shutdown();
507
508         return(NULL);
509 }
510
511 #endif
512
513 CTDL_MODULE_INIT(event_client)
514 {
515 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
516         if (!threading)
517         {
518                 InitEventQueue();
519                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
520 /// todo register shutdown callback.
521         }
522 #endif
523         return "event";
524 }