sending http requests instant now.
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
59
60 #include "event_client.h"
61 #include "serv_curl.h"
62
63 int event_add_pipe[2] = {-1, -1};
64
65 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
66 HashList *QueueEvents = NULL;
67
68 HashList *InboundEventQueue = NULL;
69 HashList *InboundEventQueues[2] = { NULL, NULL };
70
71 struct ev_loop *event_base;
72
73 ev_async AddJob;   
74 ev_async ExitEventLoop;
75 ev_async WakeupCurl;
76
77 extern struct ev_loop *event_base;
78
79 void SockStateCb(void *data, int sock, int read, int write);
80
81 #define MOPT(s, v)                                                      \
82         do {                                                            \
83                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
84                 if (sta) {                                              \
85                         CtdlLogPrintf(CTDL_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
86                         exit (1);                                       \
87                 }                                                       \
88         } while (0)
89
90
91
92 /*****************************************************************************
93  *                   libevent / curl integration                             *
94  *****************************************************************************/
95
96
97 evcurl_global_data global;
98
99 static void
100 gotstatus(evcurl_global_data *global, int nnrun) 
101 {
102         CURLM *mhnd;
103         CURLMsg *msg;
104         int nmsg;
105 /*
106   if (EVCURL_GLOBAL_MAGIC != global.magic)
107   {
108   CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct");
109   return;
110   }
111 */
112         global->nrun = nnrun;
113         mhnd = global->mhnd;
114
115         CtdlLogPrintf(CTDL_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
116         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
117                 CtdlLogPrintf(CTDL_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
118                 if (CURLMSG_DONE == msg->msg) {
119                         CtdlLogPrintf(CTDL_ERR, "EVCURL: request complete\n");
120                         CURL *chnd = msg->easy_handle;
121                         char *chandle = NULL;;
122                         CURLcode sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
123                         if (sta)
124                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
125                         evcurl_request_data  *handle = (void *)chandle;
126                         if (global != handle->global || chnd != handle->chnd)
127                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: internal evcurl error: unknown curl handle completed\n");
128                         sta = msg->data.result;
129                         if (sta) {
130                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error description: %s\n", handle->errdesc);
131                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
132                         }
133                         long httpcode;
134                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &httpcode);
135                         if (sta)
136                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
137                         CtdlLogPrintf(CTDL_ERR, "EVCURL: http response code was %ld\n", (long)httpcode);
138                         CURLMcode msta = curl_multi_remove_handle(mhnd, chnd);
139                         if (msta)
140                                 CtdlLogPrintf(CTDL_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
141                         handle->attached = 0;
142                 }
143         }
144 }
145
146 static void
147 stepmulti(evcurl_global_data *global, curl_socket_t fd) {
148         int nnrun;
149         CURLMcode msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
150         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
151         if (msta)
152                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
153         if (global->nrun != nnrun)
154                 gotstatus(global, nnrun);
155 }
156
157 static void
158 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
159         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for timeout\n");
160         evcurl_global_data *global = (void *)timeev->data;
161         stepmulti(global, CURL_SOCKET_TIMEOUT);
162 }
163
164 static void
165 gotio(struct ev_loop *loop, ev_io *ioev, int events) {
166         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
167         sockwatcher_data *sockwatcher = (void *)ioev->data;
168         stepmulti(sockwatcher->global, ioev->fd);
169 }
170
171 static size_t
172 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
173         evcurl_request_data *D = (evcurl_request_data*) data;
174         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n");
175         return CurlFillStrBuf_callback(D->ReplyData, size, nmemb, cglobal);
176 }
177
178 static int
179 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
180         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
181         evcurl_global_data *global = cglobal;
182         ev_timer_stop(EV_DEFAULT, &global->timeev);
183         if (tblock_ms < 0 || 14000 < tblock_ms)
184                 tblock_ms = 14000;
185         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
186         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
187         curl_multi_perform(global, CURL_POLL_NONE);
188         return 0;
189 }
190
191 static int
192 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) {
193         evcurl_global_data *global = cglobal;
194         CURLM *mhnd = global->mhnd;
195         sockwatcher_data *sockwatcher = csockwatcher;
196
197         CtdlLogPrintf(CTDL_DEBUG,"EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
198
199         if (!sockwatcher) {
200                 CtdlLogPrintf(CTDL_ERR,"EVCURL: called first time to register this sockwatcker\n");
201                 sockwatcher = malloc(sizeof(sockwatcher_data));
202                 sockwatcher->global = global;
203                 ev_init(&sockwatcher->ioev, &gotio);
204                 sockwatcher->ioev.data = (void *)sockwatcher;
205                 curl_multi_assign(mhnd, fd, sockwatcher);
206         }
207         if (CURL_POLL_REMOVE == action) {
208                 CtdlLogPrintf(CTDL_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
209                 free(sockwatcher);
210         } else {
211                 int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
212                 ev_io_stop(EV_DEFAULT, &sockwatcher->ioev);
213                 if (events) {
214                         ev_io_set(&sockwatcher->ioev, fd, events);
215                         ev_io_start(EV_DEFAULT, &sockwatcher->ioev);
216                 }
217         }
218         return 0;
219 }
220
221 void curl_init_connectionpool(void) 
222 {
223         CURLM *mhnd ;
224 //      global.magic = EVCURL_GLOBAL_MAGIC;
225
226         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
227         global.timeev.data = (void *)&global;
228         global.nrun = -1;
229         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
230
231         if (sta) 
232         {
233                 CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
234                 exit(1);
235         }
236 /*
237   if (!ev_default_loop(EVFLAG_AUTO))
238   {
239   CtdlLogPrintf(CTDL_ERR,"error initializing libev\n");
240   exit(2);
241   }
242 */
243         mhnd = global.mhnd = curl_multi_init();
244         if (!mhnd)
245         {
246                 CtdlLogPrintf(CTDL_ERR,"EVCURL: error initializing curl multi handle\n");
247                 exit(3);
248         }
249
250         MOPT(SOCKETFUNCTION, &gotwatchsock);
251         MOPT(SOCKETDATA, (void *)&global);
252         MOPT(TIMERFUNCTION, &gotwatchtime);
253         MOPT(TIMERDATA, (void *)&global);
254
255         /* well, just there to fire the sample request?*/
256 ///     ev_timer_start(EV_DEFAULT, &global.timeev);
257         return;
258 }
259
260
261
262
263 int evcurl_init(evcurl_request_data *handle, 
264                 void *CustomData, 
265                 const char* Desc,
266                 int CallBack) 
267 {
268         CURLcode sta;
269         CURL *chnd;
270
271         CtdlLogPrintf(CTDL_DEBUG,"EVCURL: evcurl_init called ms\n");
272         handle->global = &global;
273         handle->attached = 0;
274         chnd = handle->chnd = curl_easy_init();
275         if (!chnd)
276         {
277                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error initializing curl handle\n");
278                 return 1;
279         }
280
281         strcpy(handle->errdesc, Desc);
282
283         OPT(VERBOSE, (long)1);
284                 /* unset in production */
285         OPT(NOPROGRESS, (long)1); 
286         OPT(NOSIGNAL, (long)1);
287         OPT(FAILONERROR, (long)1);
288         OPT(ENCODING, "");
289         OPT(FOLLOWLOCATION, (long)1);
290         OPT(MAXREDIRS, (long)7);
291         OPT(USERAGENT, CITADEL);
292
293         OPT(TIMEOUT, (long)1800);
294         OPT(LOW_SPEED_LIMIT, (long)64);
295         OPT(LOW_SPEED_TIME, (long)600);
296         OPT(CONNECTTIMEOUT, (long)600); 
297         OPT(PRIVATE, (void *)handle);
298
299
300         OPT(WRITEFUNCTION, &gotdata); 
301         OPT(WRITEDATA, (void *)handle);
302         OPT(ERRORBUFFER, handle->errdesc);
303
304                 /* point to a structure that points back to the perl structure and stuff */
305         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: Loading URL: %s\n", handle->URL->PlainUrl);
306         OPT(URL, handle->URL->PlainUrl);
307         if (StrLength(handle->URL->CurlCreds))
308         {
309                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
310                 OPT(USERPWD, ChrPtr(handle->URL->CurlCreds));
311         }
312 #ifdef CURLOPT_HTTP_CONTENT_DECODING
313         OPT(HTTP_CONTENT_DECODING, 1);
314         OPT(ENCODING, "");
315 #endif
316         if (StrLength(handle->PostData) > 0)
317         { 
318                 OPT(POSTFIELDS, ChrPtr(handle->PostData));
319                 OPT(POSTFIELDSIZE, StrLength(handle->PostData));
320
321         }
322         else if ((handle->PlainPostDataLen != 0) && (handle->PlainPostData != NULL))
323         {
324                 OPT(POSTFIELDS, handle->PlainPostData);
325                 OPT(POSTFIELDSIZE, handle->PlainPostDataLen);
326         }
327
328         if (handle->headers != NULL)
329                 OPT(HTTPHEADER, handle->headers);
330
331         return 1;
332 }
333
334 void
335 evcurl_handle_start(evcurl_request_data *handle) 
336 {
337         CURLMcode msta;
338         
339         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: attaching to curl multi handle\n");
340         msta = curl_multi_add_handle(handle->global->mhnd, handle->chnd);
341         if (msta)
342                 CtdlLogPrintf(CTDL_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
343         handle->attached = 1;
344 //      ev_timer_start(EV_DEFAULT, &global.timeev);
345         ev_async_send (event_base, &WakeupCurl);
346 }
347
348 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
349 {
350 ///     evcurl_global_data *global = cglobal;
351
352         CtdlLogPrintf(CTDL_DEBUG, "EVCURL: waking up curl multi handle\n");
353
354         curl_multi_perform(&global, CURL_POLL_NONE);
355 }
356
357 /*****************************************************************************
358  *                       libevent integration                                *
359  *****************************************************************************/
360
361
362 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
363 {
364         HashList *q;
365         void *v;
366         HashPos  *It;
367         long len;
368         const char *Key;
369
370         /* get the control command... */
371         citthread_mutex_lock(&EventQueueMutex);
372
373         if (InboundEventQueues[0] == InboundEventQueue) {
374                 InboundEventQueue = InboundEventQueues[1];
375                 q = InboundEventQueues[0];
376         }
377         else {
378                 InboundEventQueue = InboundEventQueues[0];
379                 q = InboundEventQueues[1];
380         }
381         citthread_mutex_unlock(&EventQueueMutex);
382
383         It = GetNewHashPos(q, 0);
384         while (GetNextHashPos(q, It, &len, &Key, &v))
385         {
386                 IOAddHandler *h = v;
387                 h->EvAttch(h->IO);
388         }
389         DeleteHashPos(&It);
390         DeleteHashContent(&q);
391         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
392 }
393
394
395 static void EventExitCallback(EV_P_ ev_async *w, int revents)
396 {
397         ev_unloop(event_base, EVUNLOOP_ALL);
398
399         CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
400 }
401
402
403
404 void InitEventQueue(void)
405 {
406         struct rlimit LimitSet;
407
408         citthread_mutex_init(&EventQueueMutex, NULL);
409
410         if (pipe(event_add_pipe) != 0) {
411                 CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
412                 abort();
413         }
414         LimitSet.rlim_cur = 1;
415         LimitSet.rlim_max = 1;
416         setrlimit(event_add_pipe[1], &LimitSet);
417
418         QueueEvents = NewHash(1, Flathash);
419         InboundEventQueues[0] = NewHash(1, Flathash);
420         InboundEventQueues[1] = NewHash(1, Flathash);
421         InboundEventQueue = InboundEventQueues[0];
422 }
423 /*
424  * this thread operates the select() etc. via libev.
425  * 
426  * 
427  */
428 void *client_event_thread(void *arg) 
429 {
430         struct CitContext libevent_client_CC;
431
432         CtdlFillSystemContext(&libevent_client_CC, "LibEv Thread");
433 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
434         CtdlLogPrintf(CTDL_DEBUG, "client_ev_thread() initializing\n");
435
436         event_base = ev_default_loop (EVFLAG_AUTO);
437
438         ev_async_init(&AddJob, QueueEventAddCallback);
439         ev_async_start(event_base, &AddJob);
440         ev_async_init(&ExitEventLoop, EventExitCallback);
441         ev_async_start(event_base, &ExitEventLoop);
442         ev_async_init(&WakeupCurl, WakeupCurlCallback);
443         ev_async_start(event_base, &WakeupCurl);
444
445         curl_init_connectionpool();
446
447         ev_loop (event_base, 0);
448
449
450         CtdlClearSystemContext();
451         ev_default_destroy ();
452         
453         DeleteHash(&QueueEvents);
454         InboundEventQueue = NULL;
455         DeleteHash(&InboundEventQueues[0]);
456         DeleteHash(&InboundEventQueues[1]);
457         citthread_mutex_destroy(&EventQueueMutex);
458
459
460         return(NULL);
461 }
462
463 #endif
464
465 CTDL_MODULE_INIT(event_client)
466 {
467 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
468         if (!threading)
469         {
470                 InitEventQueue();
471                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
472 /// todo register shutdown callback.
473         }
474 #endif
475         return "event";
476 }