Removed the logging facility from citserver, use syslog instead
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
1 /*
2  * Copyright (c) 1998-2009 by the citadel.org team
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; either version 3 of the License, or
7  *  (at your option) any later version.
8  *
9  *  This program is distributed in the hope that it will be useful,
10  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *  GNU General Public License for more details.
13  *
14  *  You should have received a copy of the GNU General Public License
15  *  along with this program; if not, write to the Free Software
16  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  */
18
19 #include "sysdep.h"
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <stdio.h>
23 #include <termios.h>
24 #include <fcntl.h>
25 #include <signal.h>
26 #include <pwd.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <syslog.h>
30
31 #if TIME_WITH_SYS_TIME
32 # include <sys/time.h>
33 # include <time.h>
34 #else
35 # if HAVE_SYS_TIME_H
36 #  include <sys/time.h>
37 # else
38 #  include <time.h>
39 # endif
40 #endif
41 #include <sys/wait.h>
42 #include <ctype.h>
43 #include <string.h>
44 #include <limits.h>
45 #include <sys/socket.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <libcitadel.h>
49 #include <curl/curl.h>
50 #include <curl/multi.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "citserver.h"
54 #include "support.h"
55
56 #include "ctdl_module.h"
57
58 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
59
60 #include "event_client.h"
61 #include "serv_curl.h"
62
63 ev_loop *event_base;
64
65 /*****************************************************************************
66  *                   libevent / curl integration                             *
67  *****************************************************************************/
68 #define MOPT(s, v)                                                      \
69         do {                                                            \
70                 sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
71                 if (sta) {                                              \
72                         syslog(LOG_ERR, "EVCURL: error setting option " #s " on curl multi handle: %s\n", curl_easy_strerror(sta)); \
73                         exit (1);                                       \
74                 }                                                       \
75         } while (0)
76
77 typedef struct _evcurl_global_data {
78         int magic;
79         CURLM *mhnd;
80         ev_timer timeev;
81         int nrun;
82 } evcurl_global_data;
83
84 typedef struct _sockwatcher_data 
85 {
86         evcurl_global_data *global;
87         ev_io ioev;
88 } sockwatcher_data;
89
90 ev_async WakeupCurl;
91 evcurl_global_data global;
92
93 static void
94 gotstatus(evcurl_global_data *global, int nnrun) 
95 {
96         CURLM *mhnd;
97         CURLMsg *msg;
98         int nmsg;
99 /*
100   if (EVCURL_GLOBAL_MAGIC != global.magic)
101   {
102   CtdlLogPrintf(CTDL_ERR, "internal error: gotstatus on wrong struct");
103   return;
104   }
105 */
106         global->nrun = nnrun;
107         mhnd = global->mhnd;
108
109         syslog(LOG_DEBUG, "CURLEV: gotstatus(): about to call curl_multi_info_read\n");
110         while ((msg = curl_multi_info_read(mhnd, &nmsg))) {
111                 syslog(LOG_ERR, "EVCURL: got curl multi_info message msg=%d\n", msg->msg);
112                 if (CURLMSG_DONE == msg->msg) {
113                         CURL *chnd;
114                         char *chandle;
115                         CURLcode sta;
116                         CURLMcode msta;
117                         AsyncIO  *IO;
118
119                         chandle = NULL;;
120                         chnd = msg->easy_handle;
121                         sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle);
122                         syslog(LOG_ERR, "EVCURL: request complete\n");
123                         if (sta)
124                                 syslog(LOG_ERR, "EVCURL: error asking curl for private cookie of curl handle: %s\n", curl_easy_strerror(sta));
125                         IO = (AsyncIO *)chandle;
126                         
127                         sta = msg->data.result;
128                         if (sta) {
129                                 syslog(LOG_ERR, "EVCURL: error description: %s\n", IO->HttpReq.errdesc);
130                                 syslog(LOG_ERR, "EVCURL: error performing request: %s\n", curl_easy_strerror(sta));
131                         }
132                         sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode);
133                         if (sta)
134                                 syslog(LOG_ERR, "EVCURL: error asking curl for response code from request: %s\n", curl_easy_strerror(sta));
135                         syslog(LOG_ERR, "EVCURL: http response code was %ld\n", (long)IO->HttpReq.httpcode);
136                         msta = curl_multi_remove_handle(mhnd, chnd);
137                         if (msta)
138                                 syslog(LOG_ERR, "EVCURL: warning problem detaching completed handle from curl multi: %s\n", curl_multi_strerror(msta));
139
140                         IO->HttpReq.attached = 0;
141                         IO->SendDone(IO);
142                         curl_easy_cleanup(IO->HttpReq.chnd);
143                         curl_slist_free_all(IO->HttpReq.headers);
144                         FreeStrBuf(&IO->HttpReq.ReplyData);
145                         FreeURL(&IO->ConnectMe);
146                         RemoveContext(IO->CitContext);
147                         IO->Terminate(IO);
148                 }
149         }
150 }
151
152 static void
153 stepmulti(evcurl_global_data *global, curl_socket_t fd) {
154         int nnrun;
155         CURLMcode msta;
156         
157         if (global == NULL) {
158             syslog(LOG_DEBUG, "EVCURL: stepmulti(NULL): wtf?\n");
159             return;
160         }
161         msta = curl_multi_socket_action(global->mhnd, fd, 0, &nnrun);
162         syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n");
163         if (msta)
164                 syslog(LOG_ERR, "EVCURL: error in curl processing events on multi handle, fd %d: %s\n", (int)fd, curl_multi_strerror(msta));
165         if (global->nrun != nnrun)
166                 gotstatus(global, nnrun);
167 }
168
169 static void
170 gottime(struct ev_loop *loop, ev_timer *timeev, int events) {
171         syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
172         evcurl_global_data *global = (void *)timeev->data;
173         stepmulti(global, CURL_SOCKET_TIMEOUT);
174 }
175
176 static void
177 gotio(struct ev_loop *loop, ev_io *ioev, int events) {
178         syslog(LOG_DEBUG, "EVCURL: waking up curl for io on fd %d\n", (int)ioev->fd);
179         sockwatcher_data *sockwatcher = (void *)ioev->data;
180         stepmulti(sockwatcher->global, ioev->fd);
181 }
182
183 static size_t
184 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
185         AsyncIO *IO = (AsyncIO*) cglobal;
186         //evcurl_request_data *D = (evcurl_request_data*) data;
187         syslog(LOG_DEBUG, "EVCURL: gotdata(): calling CurlFillStrBuf_callback()\n");
188
189         if (IO->HttpReq.ReplyData == NULL)
190         {
191             IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
192         }
193         return CurlFillStrBuf_callback(data, size, nmemb, IO->HttpReq.ReplyData);
194 }
195
196 static int
197 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
198         syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
199         evcurl_global_data *global = cglobal;
200         ev_timer_stop(EV_DEFAULT, &global->timeev);
201         if (tblock_ms < 0 || 14000 < tblock_ms)
202                 tblock_ms = 14000;
203         ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
204         ev_timer_start(EV_DEFAULT_UC, &global->timeev);
205         curl_multi_perform(global, CURL_POLL_NONE);
206         return 0;
207 }
208
209 static int
210 gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *csockwatcher) {
211         evcurl_global_data *global = cglobal;
212         CURLM *mhnd = global->mhnd;
213         sockwatcher_data *sockwatcher = csockwatcher;
214
215         syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
216
217         if (!sockwatcher) {
218                 syslog(LOG_ERR,"EVCURL: called first time to register this sockwatcker\n");
219                 sockwatcher = malloc(sizeof(sockwatcher_data));
220                 sockwatcher->global = global;
221                 ev_init(&sockwatcher->ioev, &gotio);
222                 sockwatcher->ioev.data = (void *)sockwatcher;
223                 curl_multi_assign(mhnd, fd, sockwatcher);
224         }
225         if (CURL_POLL_REMOVE == action) {
226                 syslog(LOG_ERR,"EVCURL: called last time to unregister this sockwatcher\n");
227                 ev_io_stop(event_base, &sockwatcher->ioev);
228                 free(sockwatcher);
229         } else {
230                 int events = (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0);
231                 ev_io_stop(EV_DEFAULT, &sockwatcher->ioev);
232                 if (events) {
233                         ev_io_set(&sockwatcher->ioev, fd, events);
234                         ev_io_start(EV_DEFAULT, &sockwatcher->ioev);
235                 }
236         }
237         return 0;
238 }
239
240 void curl_init_connectionpool(void) 
241 {
242         CURLM *mhnd ;
243
244         ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
245         global.timeev.data = (void *)&global;
246         global.nrun = -1;
247         CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
248
249         if (sta) 
250         {
251                 syslog(LOG_ERR,"EVCURL: error initializing curl library: %s\n", curl_easy_strerror(sta));
252                 exit(1);
253         }
254         mhnd = global.mhnd = curl_multi_init();
255         if (!mhnd)
256         {
257                 syslog(LOG_ERR,"EVCURL: error initializing curl multi handle\n");
258                 exit(3);
259         }
260
261         MOPT(SOCKETFUNCTION, &gotwatchsock);
262         MOPT(SOCKETDATA, (void *)&global);
263         MOPT(TIMERFUNCTION, &gotwatchtime);
264         MOPT(TIMERDATA, (void *)&global);
265
266         return;
267 }
268
269
270
271
272 int evcurl_init(AsyncIO *IO, 
273                 void *CustomData, 
274                 const char* Desc,
275                 IO_CallBack CallBack, 
276                 IO_CallBack Terminate)
277 {
278         CURLcode sta;
279         CURL *chnd;
280
281         syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
282         IO->HttpReq.attached = 0;
283         IO->SendDone = CallBack;
284         IO->Terminate = Terminate;
285         chnd = IO->HttpReq.chnd = curl_easy_init();
286         if (!chnd)
287         {
288                 syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
289                 return 1;
290         }
291
292         strcpy(IO->HttpReq.errdesc, Desc);
293
294         OPT(VERBOSE, (long)1);
295                 /* unset in production */
296         OPT(NOPROGRESS, (long)1); 
297         OPT(NOSIGNAL, (long)1);
298         OPT(FAILONERROR, (long)1);
299         OPT(ENCODING, "");
300         OPT(FOLLOWLOCATION, (long)1);
301         OPT(MAXREDIRS, (long)7);
302         OPT(USERAGENT, CITADEL);
303
304         OPT(TIMEOUT, (long)1800);
305         OPT(LOW_SPEED_LIMIT, (long)64);
306         OPT(LOW_SPEED_TIME, (long)600);
307         OPT(CONNECTTIMEOUT, (long)600); 
308         OPT(PRIVATE, (void *)IO);
309
310
311         OPT(WRITEFUNCTION, &gotdata); 
312         OPT(WRITEDATA, (void *)IO);
313         OPT(ERRORBUFFER, IO->HttpReq.errdesc);
314
315         if (
316                 (!IsEmptyStr(config.c_ip_addr))
317                 && (strcmp(config.c_ip_addr, "*"))
318                 && (strcmp(config.c_ip_addr, "::"))
319                 && (strcmp(config.c_ip_addr, "0.0.0.0"))
320         ) {
321                 OPT(INTERFACE, config.c_ip_addr);
322         }
323                 /* point to a structure that points back to the perl structure and stuff */
324         syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
325         OPT(URL, IO->ConnectMe->PlainUrl);
326         if (StrLength(IO->ConnectMe->CurlCreds))
327         {
328                 OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
329                 OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
330         }
331 #ifdef CURLOPT_HTTP_CONTENT_DECODING
332         OPT(HTTP_CONTENT_DECODING, 1);
333         OPT(ENCODING, "");
334 #endif
335         if (StrLength(IO->HttpReq.PostData) > 0)
336         { 
337                 OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
338                 OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
339
340         }
341         else if ((IO->HttpReq.PlainPostDataLen != 0) && (IO->HttpReq.PlainPostData != NULL))
342         {
343                 OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
344                 OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
345         }
346
347         if (IO->HttpReq.headers != NULL)
348                 OPT(HTTPHEADER, IO->HttpReq.headers);
349
350         return 1;
351 }
352
353 void
354 evcurl_handle_start(AsyncIO *IO) 
355 {
356         CURLMcode msta;
357         
358         syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
359         msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
360         if (msta)
361                 syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta));
362         IO->HttpReq.attached = 1;
363         ev_async_send (event_base, &WakeupCurl);
364 }
365
366 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
367 {
368         syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n");
369
370         curl_multi_perform(&global, CURL_POLL_NONE);
371 }
372
373 static void evcurl_shutdown (void)
374 {
375         curl_multi_cleanup(global.mhnd);
376 }
377 /*****************************************************************************
378  *                       libevent integration                                *
379  *****************************************************************************/
380 /*
381  * client event queue plus its methods.
382  * this currently is the main loop (which may change in some future?)
383  */
384 int evbase_count = 0;
385 int event_add_pipe[2] = {-1, -1};
386 citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
387 HashList *QueueEvents = NULL;
388 HashList *InboundEventQueue = NULL;
389 HashList *InboundEventQueues[2] = { NULL, NULL };
390
391 ev_async AddJob;   
392 ev_async ExitEventLoop;
393
394 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
395 {
396         HashList *q;
397         void *v;
398         HashPos  *It;
399         long len;
400         const char *Key;
401
402         /* get the control command... */
403         citthread_mutex_lock(&EventQueueMutex);
404
405         if (InboundEventQueues[0] == InboundEventQueue) {
406                 InboundEventQueue = InboundEventQueues[1];
407                 q = InboundEventQueues[0];
408         }
409         else {
410                 InboundEventQueue = InboundEventQueues[0];
411                 q = InboundEventQueues[1];
412         }
413         citthread_mutex_unlock(&EventQueueMutex);
414
415         It = GetNewHashPos(q, 0);
416         while (GetNextHashPos(q, It, &len, &Key, &v))
417         {
418                 IOAddHandler *h = v;
419                 h->EvAttch(h->IO);
420         }
421         DeleteHashPos(&It);
422         DeleteHashContent(&q);
423         syslog(LOG_DEBUG, "EVENT Q Read done.\n");
424 }
425
426
427 static void EventExitCallback(EV_P_ ev_async *w, int revents)
428 {
429         ev_break(event_base, EVBREAK_ALL);
430
431         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
432 }
433
434
435
436 void InitEventQueue(void)
437 {
438         struct rlimit LimitSet;
439
440         citthread_mutex_init(&EventQueueMutex, NULL);
441
442         if (pipe(event_add_pipe) != 0) {
443                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
444                 abort();
445         }
446         LimitSet.rlim_cur = 1;
447         LimitSet.rlim_max = 1;
448         setrlimit(event_add_pipe[1], &LimitSet);
449
450         QueueEvents = NewHash(1, Flathash);
451         InboundEventQueues[0] = NewHash(1, Flathash);
452         InboundEventQueues[1] = NewHash(1, Flathash);
453         InboundEventQueue = InboundEventQueues[0];
454 }
455 /*
456  * this thread operates the select() etc. via libev.
457  * 
458  * 
459  */
460 void *client_event_thread(void *arg) 
461 {
462         struct CitContext libev_client_CC;
463
464         CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
465 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
466         syslog(LOG_DEBUG, "client_ev_thread() initializing\n");
467
468         event_base = ev_default_loop (EVFLAG_AUTO);
469
470         ev_async_init(&AddJob, QueueEventAddCallback);
471         ev_async_start(event_base, &AddJob);
472         ev_async_init(&ExitEventLoop, EventExitCallback);
473         ev_async_start(event_base, &ExitEventLoop);
474         ev_async_init(&WakeupCurl, WakeupCurlCallback);
475         ev_async_start(event_base, &WakeupCurl);
476
477         curl_init_connectionpool();
478
479         ev_run (event_base, 0);
480
481
482 ///what todo here?      CtdlClearSystemContext();
483         ev_loop_destroy (EV_DEFAULT_UC);
484         
485         DeleteHash(&QueueEvents);
486         InboundEventQueue = NULL;
487         DeleteHash(&InboundEventQueues[0]);
488         DeleteHash(&InboundEventQueues[1]);
489         citthread_mutex_destroy(&EventQueueMutex);
490         evcurl_shutdown();
491
492         return(NULL);
493 }
494 /*------------------------------------------------------------------------------*/
495 /*
496  * DB-Queue; does async bdb operations.
497  * has its own set of handlers.
498  */
499 ev_loop *event_db;
500 int evdb_count = 0;
501 int evdb_add_pipe[2] = {-1, -1};
502 citthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
503 HashList *DBQueueEvents = NULL;
504 HashList *DBInboundEventQueue = NULL;
505 HashList *DBInboundEventQueues[2] = { NULL, NULL };
506
507 ev_async DBAddJob;   
508 ev_async DBExitEventLoop;
509
510 extern void ShutDownDBCLient(AsyncIO *IO);
511
512 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
513 {
514         HashList *q;
515         void *v;
516         HashPos  *It;
517         long len;
518         const char *Key;
519
520         /* get the control command... */
521         citthread_mutex_lock(&DBEventQueueMutex);
522
523         if (DBInboundEventQueues[0] == DBInboundEventQueue) {
524                 DBInboundEventQueue = DBInboundEventQueues[1];
525                 q = DBInboundEventQueues[0];
526         }
527         else {
528                 DBInboundEventQueue = DBInboundEventQueues[0];
529                 q = DBInboundEventQueues[1];
530         }
531         citthread_mutex_unlock(&DBEventQueueMutex);
532
533         It = GetNewHashPos(q, 0);
534         while (GetNextHashPos(q, It, &len, &Key, &v))
535         {
536                 IOAddHandler *h = v;
537                 eNextState rc;
538                 rc = h->EvAttch(h->IO);
539                 switch (rc)
540                 {
541                 case eAbort:
542                     ShutDownDBCLient(h->IO);
543                 default:
544                     break;
545                 }
546         }
547         DeleteHashPos(&It);
548         DeleteHashContent(&q);
549         syslog(LOG_DEBUG, "DBEVENT Q Read done.\n");
550 }
551
552
553 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
554 {
555         syslog(LOG_DEBUG, "EVENT Q exiting.\n");
556         ev_break(event_db, EVBREAK_ALL);
557 }
558
559
560
561 void DBInitEventQueue(void)
562 {
563         struct rlimit LimitSet;
564
565         citthread_mutex_init(&DBEventQueueMutex, NULL);
566
567         if (pipe(evdb_add_pipe) != 0) {
568                 syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
569                 abort();
570         }
571         LimitSet.rlim_cur = 1;
572         LimitSet.rlim_max = 1;
573         setrlimit(evdb_add_pipe[1], &LimitSet);
574
575         DBQueueEvents = NewHash(1, Flathash);
576         DBInboundEventQueues[0] = NewHash(1, Flathash);
577         DBInboundEventQueues[1] = NewHash(1, Flathash);
578         DBInboundEventQueue = DBInboundEventQueues[0];
579 }
580
581 /*
582  * this thread operates writing to the message database via libev.
583  * 
584  * 
585  */
586 void *db_event_thread(void *arg) 
587 {
588         struct CitContext libev_msg_CC;
589
590         CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
591 //      citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
592         syslog(LOG_DEBUG, "client_msgev_thread() initializing\n");
593
594         event_db = ev_loop_new (EVFLAG_AUTO);
595
596         ev_async_init(&DBAddJob, DBQueueEventAddCallback);
597         ev_async_start(event_db, &DBAddJob);
598         ev_async_init(&DBExitEventLoop, DBEventExitCallback);
599         ev_async_start(event_db, &DBExitEventLoop);
600
601         ev_run (event_db, 0);
602
603
604 //// what to do here?   CtdlClearSystemContext();
605         ev_loop_destroy (event_db);
606
607         DeleteHash(&DBQueueEvents);
608         DBInboundEventQueue = NULL;
609         DeleteHash(&DBInboundEventQueues[0]);
610         DeleteHash(&DBInboundEventQueues[1]);
611         citthread_mutex_destroy(&DBEventQueueMutex);
612
613         return(NULL);
614 }
615
616 #endif
617
618 CTDL_MODULE_INIT(event_client)
619 {
620 #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
621         if (!threading)
622         {
623                 InitEventQueue();
624                 DBInitEventQueue();
625                 CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL);
626                 CtdlThreadCreate("DB event", CTDLTHREAD_BIGSTACK, db_event_thread, NULL);
627 /// todo register shutdown callback.
628         }
629 #endif
630         return "event";
631 }