- port to Cygwin (DLL support, etc.)
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #ifdef DLL_EXPORT
22 #define IN_LIBCIT
23 #endif
24
25 #include "sysdep.h"
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <dirent.h>
35 #include <db.h>
36 #include <pthread.h>
37 #include "citadel.h"
38 #include "server.h"
39 #include "dynloader.h"
40 #include "citserver.h"
41 #include "database.h"
42 #include "sysdep_decls.h"
43
44 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
45 static DB_ENV *dbenv;           /* The DB environment (global) */
46
47 struct cdbtsd {                 /* Thread-specific DB stuff */
48         DB_TXN *tid;            /* Transaction handle */
49         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
50 };
51
52 static pthread_key_t tsdkey;
53
54 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
55 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
56
57 /* just a little helper function */
58 static void txabort(DB_TXN *tid) {
59         int ret = txn_abort(tid);
60
61         if (ret) {
62                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
63                 abort();
64         }
65 }
66
67 /* this one is even more helpful than the last. */
68 static void txcommit(DB_TXN *tid) {
69         int ret = txn_commit(tid, 0);
70
71         if (ret) {
72                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
73                 abort();
74         }
75 }
76
77 /* are you sensing a pattern yet? */
78 static void txbegin(DB_TXN **tid) {
79         int ret = txn_begin(dbenv, NULL, tid, 0);
80
81         if (ret) {
82                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
83                 abort();
84         }
85 }
86
87 static void cclose(DBC *cursor) {
88         int ret;
89
90         if ((ret = cursor->c_close(cursor))) {
91                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
92                 abort();
93         }
94 }
95
96 static void bailIfCursor(DBC **cursors, const char *msg)
97 {
98   int i;
99
100   for (i = 0; i < MAXCDB; i++)
101     if (cursors[i] != NULL)
102       {
103         lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
104         abort();
105       }
106 }
107
108 static void check_handles(void *arg) {
109         if (arg != NULL) {
110                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
111
112                 bailIfCursor(tsd->cursors, "in check_handles");
113
114                 if (tsd->tid != NULL) {
115                         lprintf(1, "cdb_*: transaction still in progress!");
116                         abort();
117                 }
118         }
119 }
120
121 static void dest_tsd(void *arg) {
122         if (arg != NULL) {
123                 check_handles(arg);
124                 phree(arg);
125         }
126 }
127
128 /*
129  * Ensure that we have a key for thread-specific data.  We don't
130  * put anything in here that Citadel cares about; this is just database
131  * related stuff like cursors and transactions.
132  *
133  * This should be called immediately after startup by any thread which wants
134  * to use database calls, except for whatever thread calls open_databases.
135  */
136 void cdb_allocate_tsd(void) {
137         struct cdbtsd *tsd;
138
139         if (pthread_getspecific(tsdkey) != NULL)
140                 return;
141
142         tsd = mallok(sizeof *tsd);
143
144         tsd->tid = NULL;
145
146         memset(tsd->cursors, 0, sizeof tsd->cursors);
147         pthread_setspecific(tsdkey, tsd);
148 }
149
150 void cdb_free_tsd(void) {
151         dest_tsd(pthread_getspecific(tsdkey));
152         pthread_setspecific(tsdkey, NULL);
153 }
154
155 void cdb_check_handles(void) {
156         check_handles(pthread_getspecific(tsdkey));
157 }
158
159
160 /*
161  * Reclaim unused space in the databases.  We need to do each one of
162  * these discretely, rather than in a loop.
163  *
164  * This is a stub function in the Sleepycat DB backend, because there is no
165  * such API call available.
166  */
167 void defrag_databases(void)
168 {
169         /* do nothing */
170 }
171
172
173 /*
174  * Cull the database logs
175  */
176 static void cdb_cull_logs(void) {
177         DIR *dp;
178         struct dirent *d;
179         char filename[SIZ];
180         struct stat statbuf;
181
182         lprintf(5, "Database log file cull started.\n");
183
184         dp = opendir("data");
185         if (dp == NULL) return;
186
187         while (d = readdir(dp), d != NULL) {
188                 if (!strncasecmp(d->d_name, "log.", 4)) {
189                         sprintf(filename, "./data/%s", d->d_name);
190                         stat(filename, &statbuf);
191                         if ((time(NULL) - statbuf.st_mtime) > 432000L) {
192                                 lprintf(5, "%s ... deleted\n", filename);
193                                 unlink(filename);
194                         }
195                         else {
196                                 lprintf(5, "%s ... kept\n", filename);
197                         }
198                 }
199         }
200
201         closedir(dp);
202
203         lprintf(5, "Database log file cull ended.\n");
204 }
205
206
207 /*
208  * Request a checkpoint of the database.
209  */
210 static void cdb_checkpoint(void) {
211         int ret;
212         static time_t last_cull = 0L;
213
214         ret = txn_checkpoint(dbenv,
215                                 MAX_CHECKPOINT_KBYTES,
216                                 MAX_CHECKPOINT_MINUTES,
217                                 0);
218         if ( (ret != 0) && (ret != DB_INCOMPLETE) ) {
219                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
220                 abort();
221         }
222
223         if (ret == DB_INCOMPLETE) {
224                 lprintf(3, "WARNING: txn_checkpoint: %s\n", db_strerror(ret));
225         }
226
227         /* Cull the logs if we haven't done so for 24 hours */
228         if ((time(NULL) - last_cull) > 86400L) {
229                 last_cull = time(NULL);
230                 cdb_cull_logs();
231         }
232
233 }
234
235 /*
236  * Open the various databases we'll be using.  Any database which
237  * does not exist should be created.  Note that we don't need an S_DATABASE
238  * critical section here, because there aren't any active threads manipulating
239  * the database yet -- and besides, it causes problems on BSDI.
240  */
241 void open_databases(void)
242 {
243         int ret;
244         int i;
245         char dbfilename[SIZ];
246         u_int32_t flags = 0;
247
248         lprintf(9, "cdb_*: open_databases() starting\n");
249         /*
250          * Silently try to create the database subdirectory.  If it's
251          * already there, no problem.
252          */
253         system("exec mkdir data 2>/dev/null");
254
255         lprintf(9, "cdb_*: Setting up DB environment\n");
256         db_env_set_func_yield(sched_yield);
257         ret = db_env_create(&dbenv, 0);
258         if (ret) {
259                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
260                 exit(ret);
261         }
262         dbenv->set_errpfx(dbenv, "citserver");
263
264         /*
265          * We want to specify the shared memory buffer pool cachesize,
266          * but everything else is the default.
267          */
268         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
269         if (ret) {
270                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
271                 dbenv->close(dbenv, 0);
272                 exit(ret);
273         }
274
275         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
276                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
277                 dbenv->close(dbenv, 0);
278                 exit(ret);
279         }
280
281         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
282                 DB_INIT_LOCK|DB_THREAD;
283         ret = dbenv->open(dbenv, "./data", flags, 0);
284         if (ret) {
285                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
286                 dbenv->close(dbenv, 0);
287                 exit(ret);
288         }
289
290         lprintf(7, "cdb_*: Starting up DB\n");
291
292         for (i = 0; i < MAXCDB; ++i) {
293
294                 /* Create a database handle */
295                 ret = db_create(&dbp[i], dbenv, 0);
296                 if (ret) {
297                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
298                         exit(ret);
299                 }
300
301
302                 /* Arbitrary names for our tables -- we reference them by
303                  * number, so we don't have string names for them.
304                  */
305                 sprintf(dbfilename, "cdb.%02x", i);
306
307                 ret = dbp[i]->open(dbp[i],
308                                 dbfilename,
309                                 NULL,
310                                 DB_BTREE,
311                                 DB_CREATE|DB_THREAD,
312                                 0600);
313                 if (ret) {
314                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
315                         exit(ret);
316                 }
317         }
318
319         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
320                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
321                 exit(1);
322         }
323
324         cdb_allocate_tsd();
325         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
326         lprintf(9, "cdb_*: open_databases() finished\n");
327 }
328
329
330 /*
331  * Close all of the db database files we've opened.  This can be done
332  * in a loop, since it's just a bunch of closes.
333  */
334 void close_databases(void)
335 {
336         int a;
337         int ret;
338
339         cdb_free_tsd();
340
341         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
342                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
343                 abort();
344         }
345
346         for (a = 0; a < MAXCDB; ++a) {
347                 lprintf(7, "cdb_*: Closing database %d\n", a);
348                 ret = dbp[a]->close(dbp[a], 0);
349                 if (ret) {
350                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
351                         abort();
352                 }
353                 
354         }
355
356         /* Close the handle. */
357         ret = dbenv->close(dbenv, 0);
358         if (ret) {
359                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
360                 abort();
361         }
362 }
363
364 /*
365  * Store a piece of data.  Returns 0 if the operation was successful.  If a
366  * key already exists it should be overwritten.
367  */
368 int cdb_store(int cdb,
369               void *ckey, int ckeylen,
370               void *cdata, int cdatalen)
371 {
372
373   DBT dkey, ddata;
374   DB_TXN *tid;
375   int ret;
376   
377   memset(&dkey, 0, sizeof(DBT));
378   memset(&ddata, 0, sizeof(DBT));
379   dkey.size = ckeylen;
380   dkey.data = ckey;
381   ddata.size = cdatalen;
382   ddata.data = cdata;
383   
384   if (MYTID != NULL)
385     {
386       ret = dbp[cdb]->put(dbp[cdb],             /* db */
387                           MYTID,                /* transaction ID */
388                           &dkey,                /* key */
389                           &ddata,               /* data */
390                           0);           /* flags */
391       if (ret)
392         {
393           lprintf(1, "cdb_store(%d): %s\n", cdb,
394                   db_strerror(ret));
395           abort();
396         }
397       return ret;
398       
399     }
400   else
401     {
402       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
403       
404     retry:
405       txbegin(&tid);
406       
407       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
408                                tid,         /* transaction ID */
409                                &dkey,       /* key */
410                                &ddata,      /* data */
411                                0)))         /* flags */
412         {
413           if (ret == DB_LOCK_DEADLOCK)
414             {
415               txabort(tid);
416               goto retry;
417             }
418           else
419             {
420               lprintf(1, "cdb_store(%d): %s\n", cdb,
421                       db_strerror(ret));
422               abort();
423             }
424         }
425       else
426         {
427           txcommit(tid);
428           return ret;
429         }
430     }
431 }
432
433
434 /*
435  * Delete a piece of data.  Returns 0 if the operation was successful.
436  */
437 int cdb_delete(int cdb, void *key, int keylen)
438 {
439
440   DBT dkey;
441   DB_TXN *tid;
442   int ret;
443
444   memset(&dkey, 0, sizeof dkey);
445   dkey.size = keylen;
446   dkey.data = key;
447
448   if (MYTID != NULL)
449     {
450       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
451       if (ret)
452         {
453           lprintf(1, "cdb_delete(%d): %s\n", cdb,
454                   db_strerror(ret));
455           if (ret != DB_NOTFOUND)
456             abort();
457         }
458     }
459   else
460     {
461       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
462     
463     retry:
464       txbegin(&tid);
465     
466       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
467           && ret != DB_NOTFOUND)
468         {
469           if (ret == DB_LOCK_DEADLOCK)
470             {
471               txabort(tid);
472               goto retry;
473             }
474           else
475             {
476               lprintf(1, "cdb_delete(%d): %s\n", cdb,
477                       db_strerror(ret));
478               abort();
479             }
480         }
481       else
482         {
483           txcommit(tid);
484         }
485     }
486   return ret;
487 }
488
489 static DBC *localcursor(int cdb)
490 {
491   int ret;
492   DBC *curs;
493
494   if (MYCURSORS[cdb] == NULL)
495     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
496   else
497     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
498
499   if (ret)
500     {
501       lprintf(1, "localcursor: %s\n", db_strerror(ret));
502       abort();
503     }
504
505   return curs;
506 }
507
508
509 /*
510  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
511  * a struct cdbdata which it is the caller's responsibility to free later on
512  * using the cdb_free() routine.
513  */
514 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
515 {
516
517   struct cdbdata *tempcdb;
518   DBT dkey, dret;
519   int ret;
520
521   memset(&dkey, 0, sizeof(DBT));
522   dkey.size = keylen;
523   dkey.data = key;
524
525   if (MYTID != NULL)
526     {
527       memset(&dret, 0, sizeof(DBT));
528       dret.flags = DB_DBT_MALLOC;
529       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
530     }
531   else
532     {
533       DBC *curs;
534
535       do
536         {
537           memset(&dret, 0, sizeof(DBT));
538           dret.flags = DB_DBT_MALLOC;
539
540           curs = localcursor(cdb);
541
542           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
543           cclose(curs);
544         }
545       while (ret == DB_LOCK_DEADLOCK);
546
547     }
548
549   if ((ret != 0) && (ret != DB_NOTFOUND))
550     {
551       lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
552       abort();
553     }
554
555   if (ret != 0) return NULL;
556   tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
557
558   if (tempcdb == NULL)
559     {
560       lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
561       abort();
562     }
563
564   tempcdb->len = dret.size;
565   tempcdb->ptr = dret.data;
566   return (tempcdb);
567 }
568
569
570 /*
571  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
572  * more complex stuff with other database managers in the future).
573  */
574 void cdb_free(struct cdbdata *cdb)
575 {
576         phree(cdb->ptr);
577         phree(cdb);
578 }
579
580 void cdb_close_cursor(int cdb)
581 {
582         if (MYCURSORS[cdb] != NULL)
583                 cclose(MYCURSORS[cdb]);
584
585         MYCURSORS[cdb] = NULL;
586 }
587
588 /* 
589  * Prepare for a sequential search of an entire database.
590  * (There is guaranteed to be no more than one traversal in
591  * progress per thread at any given time.)
592  */
593 void cdb_rewind(int cdb)
594 {
595         int ret = 0;
596
597         if (MYCURSORS[cdb] != NULL)
598                 cclose(MYCURSORS[cdb]);
599
600         /*
601          * Now initialize the cursor
602          */
603         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
604         if (ret) {
605                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
606                 abort();
607         }
608 }
609
610
611 /*
612  * Fetch the next item in a sequential search.  Returns a pointer to a 
613  * cdbdata structure, or NULL if we've hit the end.
614  */
615 struct cdbdata *cdb_next_item(int cdb)
616 {
617         DBT key, data;
618         struct cdbdata *cdbret;
619         int ret = 0;
620
621         /* Initialize the key/data pair so the flags aren't set. */
622         memset(&key, 0, sizeof(key));
623         memset(&data, 0, sizeof(data));
624         data.flags = DB_DBT_MALLOC;
625
626         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
627                 &key, &data, DB_NEXT);
628         
629         if (ret) {
630                 if (ret != DB_NOTFOUND) {
631                         lprintf(1, "cdb_next_item(%d): %s\n",
632                                 cdb, db_strerror(ret));
633                         abort();
634                 }
635                 cclose(MYCURSORS[cdb]);
636                 MYCURSORS[cdb] = NULL;
637                 return NULL;            /* presumably, end of file */
638         }
639
640         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
641         cdbret->len = data.size;
642         cdbret->ptr = data.data;
643
644         return (cdbret);
645 }
646
647
648 /*
649  * Transaction-based stuff.  I'm writing this as I bake cookies...
650  */
651
652 void cdb_begin_transaction(void) {
653
654   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
655
656   if (MYTID != NULL)
657     {
658       lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
659       abort();
660     }
661
662   txbegin(&MYTID);
663 }
664
665 void cdb_end_transaction(void) {
666   int i;
667
668   for (i = 0; i < MAXCDB; i++)
669     if (MYCURSORS[i] != NULL) {
670       lprintf(1, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
671       cclose(MYCURSORS[i]);
672       MYCURSORS[i] = NULL;
673     }
674
675   if (MYTID == NULL)
676     {
677       lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
678       abort();
679     }
680   else
681     txcommit(MYTID);
682
683   MYTID = NULL;
684 }
685