d98215036ac4c5710b9448a7819c14c2cf185ca0
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #ifdef DLL_EXPORT
22 #define IN_LIBCIT
23 #endif
24
25 #include "sysdep.h"
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <dirent.h>
35
36 #ifdef HAVE_DB4_DB_H
37 #include <db4/db.h>
38 #elif defined(HAVE_DB3_DB_H)
39 #include <db3/db.h>
40 #elif defined(HAVE_DB_H)
41 #include <db.h>
42 #else
43 #error Neither <db.h> nor <db3/db.h> was found by configure. Install db3-devel.
44 #endif
45
46 #include <pthread.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "dynloader.h"
50 #include "citserver.h"
51 #include "database.h"
52 #include "sysdep_decls.h"
53
54 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
55 static DB_ENV *dbenv;           /* The DB environment (global) */
56
57 struct cdbtsd {                 /* Thread-specific DB stuff */
58         DB_TXN *tid;            /* Transaction handle */
59         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
60 };
61
62 static pthread_key_t tsdkey;
63
64 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
65 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
66
67 /* just a little helper function */
68 static void txabort(DB_TXN *tid) {
69         int ret = txn_abort(tid);
70
71         if (ret) {
72                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
73                 abort();
74         }
75 }
76
77 /* this one is even more helpful than the last. */
78 static void txcommit(DB_TXN *tid) {
79         int ret = txn_commit(tid, 0);
80
81         if (ret) {
82                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
83                 abort();
84         }
85 }
86
87 /* are you sensing a pattern yet? */
88 static void txbegin(DB_TXN **tid) {
89         int ret = txn_begin(dbenv, NULL, tid, 0);
90
91         if (ret) {
92                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
93                 abort();
94         }
95 }
96
97 static void cclose(DBC *cursor) {
98         int ret;
99
100         if ((ret = cursor->c_close(cursor))) {
101                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
102                 abort();
103         }
104 }
105
106 static void bailIfCursor(DBC **cursors, const char *msg)
107 {
108   int i;
109
110   for (i = 0; i < MAXCDB; i++)
111     if (cursors[i] != NULL)
112       {
113         lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
114         abort();
115       }
116 }
117
118 static void check_handles(void *arg) {
119         if (arg != NULL) {
120                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
121
122                 bailIfCursor(tsd->cursors, "in check_handles");
123
124                 if (tsd->tid != NULL) {
125                         lprintf(1, "cdb_*: transaction still in progress!");
126                         abort();
127                 }
128         }
129 }
130
131 static void dest_tsd(void *arg) {
132         if (arg != NULL) {
133                 check_handles(arg);
134                 phree(arg);
135         }
136 }
137
138 /*
139  * Ensure that we have a key for thread-specific data.  We don't
140  * put anything in here that Citadel cares about; this is just database
141  * related stuff like cursors and transactions.
142  *
143  * This should be called immediately after startup by any thread which wants
144  * to use database calls, except for whatever thread calls open_databases.
145  */
146 void cdb_allocate_tsd(void) {
147         struct cdbtsd *tsd;
148
149         if (pthread_getspecific(tsdkey) != NULL)
150                 return;
151
152         tsd = mallok(sizeof *tsd);
153
154         tsd->tid = NULL;
155
156         memset(tsd->cursors, 0, sizeof tsd->cursors);
157         pthread_setspecific(tsdkey, tsd);
158 }
159
160 void cdb_free_tsd(void) {
161         dest_tsd(pthread_getspecific(tsdkey));
162         pthread_setspecific(tsdkey, NULL);
163 }
164
165 void cdb_check_handles(void) {
166         check_handles(pthread_getspecific(tsdkey));
167 }
168
169
170 /*
171  * Reclaim unused space in the databases.  We need to do each one of
172  * these discretely, rather than in a loop.
173  *
174  * This is a stub function in the Sleepycat DB backend, because there is no
175  * such API call available.
176  */
177 void defrag_databases(void)
178 {
179         /* do nothing */
180 }
181
182
183 /*
184  * Cull the database logs
185  */
186 static void cdb_cull_logs(void) {
187         u_int32_t flags;
188         int ret;
189         char **file, **list;
190
191         lprintf(5, "Database log file cull started.\n");
192
193         flags = DB_ARCH_ABS;
194
195         /* Get the list of names. */
196 #if DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR < 3
197         if ((ret = log_archive(dbenv, &list, flags, NULL)) != 0) {
198 #elif DB_VERSION_MAJOR >= 4
199         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
200 #else
201         if ((ret = log_archive(dbenv, &list, flags)) != 0) {
202 #endif
203                 lprintf(1, "cdb_cull_logs: %s\n", db_strerror(ret));
204                 return;
205         }
206
207         /* Print the list of names. */
208         if (list != NULL) {
209                 for (file = list; *file != NULL; ++file) {
210                         lprintf(9, "Deleting log: %s\n", *file);
211                         unlink(*file);
212                 }
213                 free(list);
214         }
215
216         lprintf(5, "Database log file cull ended.\n");
217 }
218
219
220 /*
221  * Request a checkpoint of the database.
222  */
223 static void cdb_checkpoint(void) {
224         int ret;
225         static time_t last_cull = 0L;
226
227 #if DB_VERSION_MAJOR >= 4
228         ret = dbenv->txn_checkpoint(dbenv,
229 #else
230         ret = txn_checkpoint(dbenv,
231 #endif
232                                 MAX_CHECKPOINT_KBYTES,
233                                 MAX_CHECKPOINT_MINUTES,
234                                 0);
235         if ( (ret != 0) && (ret != DB_INCOMPLETE) ) {
236                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
237                 abort();
238         }
239
240         if (ret == DB_INCOMPLETE) {
241                 lprintf(3, "WARNING: txn_checkpoint: %s\n", db_strerror(ret));
242         }
243
244         /* Cull the logs if we haven't done so for 24 hours */
245         if ((time(NULL) - last_cull) > 86400L) {
246                 last_cull = time(NULL);
247                 cdb_cull_logs();
248         }
249
250 }
251
252 /*
253  * Open the various databases we'll be using.  Any database which
254  * does not exist should be created.  Note that we don't need an S_DATABASE
255  * critical section here, because there aren't any active threads manipulating
256  * the database yet -- and besides, it causes problems on BSDI.
257  */
258 void open_databases(void)
259 {
260         int ret;
261         int i;
262         char dbfilename[SIZ];
263         u_int32_t flags = 0;
264
265         lprintf(9, "cdb_*: open_databases() starting\n");
266         /*
267          * Silently try to create the database subdirectory.  If it's
268          * already there, no problem.
269          */
270         system("exec mkdir data 2>/dev/null");
271
272         lprintf(9, "cdb_*: Setting up DB environment\n");
273         db_env_set_func_yield(sched_yield);
274         ret = db_env_create(&dbenv, 0);
275         if (ret) {
276                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
277                 exit(ret);
278         }
279         dbenv->set_errpfx(dbenv, "citserver");
280
281         /*
282          * We want to specify the shared memory buffer pool cachesize,
283          * but everything else is the default.
284          */
285         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
286         if (ret) {
287                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
288                 dbenv->close(dbenv, 0);
289                 exit(ret);
290         }
291
292         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
293                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
294                 dbenv->close(dbenv, 0);
295                 exit(ret);
296         }
297
298         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
299                 DB_INIT_LOCK|DB_THREAD;
300         ret = dbenv->open(dbenv, "./data", flags, 0);
301         if (ret) {
302                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
303                 dbenv->close(dbenv, 0);
304                 exit(ret);
305         }
306
307         lprintf(7, "cdb_*: Starting up DB\n");
308
309         for (i = 0; i < MAXCDB; ++i) {
310
311                 /* Create a database handle */
312                 ret = db_create(&dbp[i], dbenv, 0);
313                 if (ret) {
314                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
315                         exit(ret);
316                 }
317
318
319                 /* Arbitrary names for our tables -- we reference them by
320                  * number, so we don't have string names for them.
321                  */
322                 sprintf(dbfilename, "cdb.%02x", i);
323
324                 ret = dbp[i]->open(dbp[i],
325                                 dbfilename,
326                                 NULL,
327                                 DB_BTREE,
328                                 DB_CREATE|DB_THREAD,
329                                 0600);
330                 if (ret) {
331                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
332                         exit(ret);
333                 }
334         }
335
336         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
337                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
338                 exit(1);
339         }
340
341         cdb_allocate_tsd();
342         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
343         lprintf(9, "cdb_*: open_databases() finished\n");
344 }
345
346
347 /*
348  * Close all of the db database files we've opened.  This can be done
349  * in a loop, since it's just a bunch of closes.
350  */
351 void close_databases(void)
352 {
353         int a;
354         int ret;
355
356         cdb_free_tsd();
357
358 #if DB_VERSION_MAJOR >= 4
359         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
360 #else
361         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
362 #endif
363                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
364                 abort();
365         }
366
367         for (a = 0; a < MAXCDB; ++a) {
368                 lprintf(7, "cdb_*: Closing database %d\n", a);
369                 ret = dbp[a]->close(dbp[a], 0);
370                 if (ret) {
371                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
372                         abort();
373                 }
374                 
375         }
376
377         /* Close the handle. */
378         ret = dbenv->close(dbenv, 0);
379         if (ret) {
380                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
381                 abort();
382         }
383 }
384
385 /*
386  * Store a piece of data.  Returns 0 if the operation was successful.  If a
387  * key already exists it should be overwritten.
388  */
389 int cdb_store(int cdb,
390               void *ckey, int ckeylen,
391               void *cdata, int cdatalen)
392 {
393
394   DBT dkey, ddata;
395   DB_TXN *tid;
396   int ret;
397   
398   memset(&dkey, 0, sizeof(DBT));
399   memset(&ddata, 0, sizeof(DBT));
400   dkey.size = ckeylen;
401   dkey.data = ckey;
402   ddata.size = cdatalen;
403   ddata.data = cdata;
404   
405   if (MYTID != NULL)
406     {
407       ret = dbp[cdb]->put(dbp[cdb],             /* db */
408                           MYTID,                /* transaction ID */
409                           &dkey,                /* key */
410                           &ddata,               /* data */
411                           0);           /* flags */
412       if (ret)
413         {
414           lprintf(1, "cdb_store(%d): %s\n", cdb,
415                   db_strerror(ret));
416           abort();
417         }
418       return ret;
419       
420     }
421   else
422     {
423       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
424       
425     retry:
426       txbegin(&tid);
427       
428       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
429                                tid,         /* transaction ID */
430                                &dkey,       /* key */
431                                &ddata,      /* data */
432                                0)))         /* flags */
433         {
434           if (ret == DB_LOCK_DEADLOCK)
435             {
436               txabort(tid);
437               goto retry;
438             }
439           else
440             {
441               lprintf(1, "cdb_store(%d): %s\n", cdb,
442                       db_strerror(ret));
443               abort();
444             }
445         }
446       else
447         {
448           txcommit(tid);
449           return ret;
450         }
451     }
452 }
453
454
455 /*
456  * Delete a piece of data.  Returns 0 if the operation was successful.
457  */
458 int cdb_delete(int cdb, void *key, int keylen)
459 {
460
461   DBT dkey;
462   DB_TXN *tid;
463   int ret;
464
465   memset(&dkey, 0, sizeof dkey);
466   dkey.size = keylen;
467   dkey.data = key;
468
469   if (MYTID != NULL)
470     {
471       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
472       if (ret)
473         {
474           lprintf(1, "cdb_delete(%d): %s\n", cdb,
475                   db_strerror(ret));
476           if (ret != DB_NOTFOUND)
477             abort();
478         }
479     }
480   else
481     {
482       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
483     
484     retry:
485       txbegin(&tid);
486     
487       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
488           && ret != DB_NOTFOUND)
489         {
490           if (ret == DB_LOCK_DEADLOCK)
491             {
492               txabort(tid);
493               goto retry;
494             }
495           else
496             {
497               lprintf(1, "cdb_delete(%d): %s\n", cdb,
498                       db_strerror(ret));
499               abort();
500             }
501         }
502       else
503         {
504           txcommit(tid);
505         }
506     }
507   return ret;
508 }
509
510 static DBC *localcursor(int cdb)
511 {
512   int ret;
513   DBC *curs;
514
515   if (MYCURSORS[cdb] == NULL)
516     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
517   else
518     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
519
520   if (ret)
521     {
522       lprintf(1, "localcursor: %s\n", db_strerror(ret));
523       abort();
524     }
525
526   return curs;
527 }
528
529
530 /*
531  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
532  * a struct cdbdata which it is the caller's responsibility to free later on
533  * using the cdb_free() routine.
534  */
535 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
536 {
537
538   struct cdbdata *tempcdb;
539   DBT dkey, dret;
540   int ret;
541
542   memset(&dkey, 0, sizeof(DBT));
543   dkey.size = keylen;
544   dkey.data = key;
545
546   if (MYTID != NULL)
547     {
548       memset(&dret, 0, sizeof(DBT));
549       dret.flags = DB_DBT_MALLOC;
550       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
551     }
552   else
553     {
554       DBC *curs;
555
556       do
557         {
558           memset(&dret, 0, sizeof(DBT));
559           dret.flags = DB_DBT_MALLOC;
560
561           curs = localcursor(cdb);
562
563           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
564           cclose(curs);
565         }
566       while (ret == DB_LOCK_DEADLOCK);
567
568     }
569
570   if ((ret != 0) && (ret != DB_NOTFOUND))
571     {
572       lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
573       abort();
574     }
575
576   if (ret != 0) return NULL;
577   tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
578
579   if (tempcdb == NULL)
580     {
581       lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
582       abort();
583     }
584
585   tempcdb->len = dret.size;
586   tempcdb->ptr = dret.data;
587   return (tempcdb);
588 }
589
590
591 /*
592  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
593  * more complex stuff with other database managers in the future).
594  */
595 void cdb_free(struct cdbdata *cdb)
596 {
597         phree(cdb->ptr);
598         phree(cdb);
599 }
600
601 void cdb_close_cursor(int cdb)
602 {
603         if (MYCURSORS[cdb] != NULL)
604                 cclose(MYCURSORS[cdb]);
605
606         MYCURSORS[cdb] = NULL;
607 }
608
609 /* 
610  * Prepare for a sequential search of an entire database.
611  * (There is guaranteed to be no more than one traversal in
612  * progress per thread at any given time.)
613  */
614 void cdb_rewind(int cdb)
615 {
616         int ret = 0;
617
618         if (MYCURSORS[cdb] != NULL)
619                 cclose(MYCURSORS[cdb]);
620
621         /*
622          * Now initialize the cursor
623          */
624         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
625         if (ret) {
626                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
627                 abort();
628         }
629 }
630
631
632 /*
633  * Fetch the next item in a sequential search.  Returns a pointer to a 
634  * cdbdata structure, or NULL if we've hit the end.
635  */
636 struct cdbdata *cdb_next_item(int cdb)
637 {
638         DBT key, data;
639         struct cdbdata *cdbret;
640         int ret = 0;
641
642         /* Initialize the key/data pair so the flags aren't set. */
643         memset(&key, 0, sizeof(key));
644         memset(&data, 0, sizeof(data));
645         data.flags = DB_DBT_MALLOC;
646
647         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
648                 &key, &data, DB_NEXT);
649         
650         if (ret) {
651                 if (ret != DB_NOTFOUND) {
652                         lprintf(1, "cdb_next_item(%d): %s\n",
653                                 cdb, db_strerror(ret));
654                         abort();
655                 }
656                 cclose(MYCURSORS[cdb]);
657                 MYCURSORS[cdb] = NULL;
658                 return NULL;            /* presumably, end of file */
659         }
660
661         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
662         cdbret->len = data.size;
663         cdbret->ptr = data.data;
664
665         return (cdbret);
666 }
667
668
669
670 /*
671  * Transaction-based stuff.  I'm writing this as I bake cookies...
672  */
673
674 void cdb_begin_transaction(void) {
675
676   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
677
678   if (MYTID != NULL)
679     {
680       lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
681       abort();
682     }
683
684   txbegin(&MYTID);
685 }
686
687 void cdb_end_transaction(void) {
688   int i;
689
690   for (i = 0; i < MAXCDB; i++)
691     if (MYCURSORS[i] != NULL) {
692       lprintf(1, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
693       cclose(MYCURSORS[i]);
694       MYCURSORS[i] = NULL;
695     }
696
697   if (MYTID == NULL)
698     {
699       lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
700       abort();
701     }
702   else
703     txcommit(MYTID);
704
705   MYTID = NULL;
706 }
707
708 /*
709  * Truncate (delete every record)
710  */
711 void cdb_trunc(int cdb)
712 {
713   DB_TXN *tid;
714   int ret;
715   u_int32_t count;
716   
717   if (MYTID != NULL)
718     {
719       ret = dbp[cdb]->truncate(dbp[cdb],        /* db */
720                                MYTID,           /* transaction ID */
721                                &count,          /* #rows that were deleted */
722                                0);              /* flags */
723       if (ret)
724         {
725           lprintf(1, "cdb_truncate(%d): %s\n", cdb,
726                   db_strerror(ret));
727           abort();
728         }
729     }
730   else
731     {
732       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
733       
734     retry:
735       txbegin(&tid);
736       
737       if ((ret = dbp[cdb]->truncate(dbp[cdb],    /* db */
738                                     tid,         /* transaction ID */
739                                     &count,      /* #rows deleted */
740                                     0)))         /* flags */
741         {
742           if (ret == DB_LOCK_DEADLOCK)
743             {
744               txabort(tid);
745               goto retry;
746             }
747           else
748             {
749               lprintf(1, "cdb_truncate(%d): %s\n", cdb,
750                       db_strerror(ret));
751               abort();
752             }
753         }
754       else
755         {
756           txcommit(tid);
757         }
758     }
759 }