9bbc86b924ed9e0f80bd70c5712af1b917e0322f
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31 #include <db.h>
32 #include <pthread.h>
33 #include "citadel.h"
34 #include "server.h"
35 #include "citserver.h"
36 #include "database.h"
37 #include "sysdep_decls.h"
38 #include "dynloader.h"
39
40 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
41 static DB_ENV *dbenv;           /* The DB environment (global) */
42
43 struct cdbtsd {                 /* Thread-specific DB stuff */
44         DB_TXN *tid;            /* Transaction handle */
45         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
46 };
47
48 static pthread_key_t tsdkey;
49
50 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
51 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
52
53 /* just a little helper function */
54 static void txabort(DB_TXN *tid) {
55         int ret = txn_abort(tid);
56
57         if (ret) {
58                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
59                 abort();
60         }
61 }
62
63 /* this one is even more helpful than the last. */
64 static void txcommit(DB_TXN *tid) {
65         int ret = txn_commit(tid, 0);
66
67         if (ret) {
68                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
69                 abort();
70         }
71 }
72
73 /* are you sensing a pattern yet? */
74 static void txbegin(DB_TXN **tid) {
75         int ret = txn_begin(dbenv, NULL, tid, 0);
76
77         if (ret) {
78                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
79                 abort();
80         }
81 }
82
83 static void cclose(DBC *cursor) {
84         int ret;
85
86         if ((ret = cursor->c_close(cursor))) {
87                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
88                 abort();
89         }
90 }
91
92 static void bailIfCursor(DBC **cursors, const char *msg)
93 {
94   int i;
95
96   for (i = 0; i < MAXCDB; i++)
97     if (cursors[i] != NULL)
98       {
99         lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
100         abort();
101       }
102 }
103
104 static void check_handles(void *arg) {
105         if (arg != NULL) {
106                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
107
108                 bailIfCursor(tsd->cursors, "in check_handles");
109
110                 if (tsd->tid != NULL) {
111                         lprintf(1, "cdb_*: transaction still in progress!");
112                         abort();
113                 }
114         }
115 }
116
117 static void dest_tsd(void *arg) {
118         if (arg != NULL) {
119                 check_handles(arg);
120                 phree(arg);
121         }
122 }
123
124 /*
125  * Ensure that we have a key for thread-specific data.  We don't
126  * put anything in here that Citadel cares about; this is just database
127  * related stuff like cursors and transactions.
128  *
129  * This should be called immediately after startup by any thread which wants
130  * to use database calls, except for whatever thread calls open_databases.
131  */
132 void cdb_allocate_tsd(void) {
133         struct cdbtsd *tsd;
134
135         if (pthread_getspecific(tsdkey) != NULL)
136                 return;
137
138         tsd = mallok(sizeof *tsd);
139
140         tsd->tid = NULL;
141
142         memset(tsd->cursors, 0, sizeof tsd->cursors);
143         pthread_setspecific(tsdkey, tsd);
144 }
145
146 void cdb_free_tsd(void) {
147         dest_tsd(pthread_getspecific(tsdkey));
148         pthread_setspecific(tsdkey, NULL);
149 }
150
151 void cdb_check_handles(void) {
152         check_handles(pthread_getspecific(tsdkey));
153 }
154
155
156 /*
157  * Reclaim unused space in the databases.  We need to do each one of
158  * these discretely, rather than in a loop.
159  *
160  * This is a stub function in the Sleepycat DB backend, because there is no
161  * such API call available.
162  */
163 void defrag_databases(void)
164 {
165         /* do nothing */
166 }
167
168
169 /*
170  * Cull the database logs
171  */
172 static void cdb_cull_logs(void) {
173         DIR *dp;
174         struct dirent *d;
175         char filename[SIZ];
176         struct stat statbuf;
177
178         lprintf(5, "Database log file cull started.\n");
179
180         dp = opendir("data");
181         if (dp == NULL) return;
182
183         while (d = readdir(dp), d != NULL) {
184                 if (!strncasecmp(d->d_name, "log.", 4)) {
185                         sprintf(filename, "./data/%s", d->d_name);
186                         stat(filename, &statbuf);
187                         if ((time(NULL) - statbuf.st_mtime) > 432000L) {
188                                 lprintf(5, "%s ... deleted\n", filename);
189                                 unlink(filename);
190                         }
191                         else {
192                                 lprintf(5, "%s ... kept\n", filename);
193                         }
194                 }
195         }
196
197         closedir(dp);
198
199         lprintf(5, "Database log file cull ended.\n");
200 }
201
202
203 /*
204  * Request a checkpoint of the database.
205  */
206 static void cdb_checkpoint(void) {
207         int ret;
208         static time_t last_cull = 0L;
209
210         ret = txn_checkpoint(dbenv,
211                                 MAX_CHECKPOINT_KBYTES,
212                                 MAX_CHECKPOINT_MINUTES,
213                                 0);
214         if ( (ret != 0) && (ret != DB_INCOMPLETE) ) {
215                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
216                 abort();
217         }
218
219         if (ret == DB_INCOMPLETE) {
220                 lprintf(3, "WARNING: txn_checkpoint: %s\n", db_strerror(ret));
221         }
222
223         /* Cull the logs if we haven't done so for 24 hours */
224         if ((time(NULL) - last_cull) > 86400L) {
225                 last_cull = time(NULL);
226                 cdb_cull_logs();
227         }
228
229 }
230
231 /*
232  * Open the various databases we'll be using.  Any database which
233  * does not exist should be created.  Note that we don't need an S_DATABASE
234  * critical section here, because there aren't any active threads manipulating
235  * the database yet -- and besides, it causes problems on BSDI.
236  */
237 void open_databases(void)
238 {
239         int ret;
240         int i;
241         char dbfilename[SIZ];
242         u_int32_t flags = 0;
243
244         lprintf(9, "cdb_*: open_databases() starting\n");
245         /*
246          * Silently try to create the database subdirectory.  If it's
247          * already there, no problem.
248          */
249         system("exec mkdir data 2>/dev/null");
250
251         lprintf(9, "cdb_*: Setting up DB environment\n");
252         db_env_set_func_yield(sched_yield);
253         ret = db_env_create(&dbenv, 0);
254         if (ret) {
255                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
256                 exit(ret);
257         }
258         dbenv->set_errpfx(dbenv, "citserver");
259
260         /*
261          * We want to specify the shared memory buffer pool cachesize,
262          * but everything else is the default.
263          */
264         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
265         if (ret) {
266                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
267                 dbenv->close(dbenv, 0);
268                 exit(ret);
269         }
270
271         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
272                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
273                 dbenv->close(dbenv, 0);
274                 exit(ret);
275         }
276
277         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
278                 DB_INIT_LOCK|DB_THREAD;
279         ret = dbenv->open(dbenv, "./data", flags, 0);
280         if (ret) {
281                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
282                 dbenv->close(dbenv, 0);
283                 exit(ret);
284         }
285
286         lprintf(7, "cdb_*: Starting up DB\n");
287
288         for (i = 0; i < MAXCDB; ++i) {
289
290                 /* Create a database handle */
291                 ret = db_create(&dbp[i], dbenv, 0);
292                 if (ret) {
293                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
294                         exit(ret);
295                 }
296
297
298                 /* Arbitrary names for our tables -- we reference them by
299                  * number, so we don't have string names for them.
300                  */
301                 sprintf(dbfilename, "cdb.%02x", i);
302
303                 ret = dbp[i]->open(dbp[i],
304                                 dbfilename,
305                                 NULL,
306                                 DB_BTREE,
307                                 DB_CREATE|DB_THREAD,
308                                 0600);
309                 if (ret) {
310                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
311                         exit(ret);
312                 }
313         }
314
315         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
316                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
317                 exit(1);
318         }
319
320         cdb_allocate_tsd();
321         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
322         lprintf(9, "cdb_*: open_databases() finished\n");
323 }
324
325
326 /*
327  * Close all of the db database files we've opened.  This can be done
328  * in a loop, since it's just a bunch of closes.
329  */
330 void close_databases(void)
331 {
332         int a;
333         int ret;
334
335         cdb_free_tsd();
336
337         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
338                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
339                 abort();
340         }
341
342         for (a = 0; a < MAXCDB; ++a) {
343                 lprintf(7, "cdb_*: Closing database %d\n", a);
344                 ret = dbp[a]->close(dbp[a], 0);
345                 if (ret) {
346                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
347                         abort();
348                 }
349                 
350         }
351
352         /* Close the handle. */
353         ret = dbenv->close(dbenv, 0);
354         if (ret) {
355                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
356                 abort();
357         }
358 }
359
360 /*
361  * Store a piece of data.  Returns 0 if the operation was successful.  If a
362  * key already exists it should be overwritten.
363  */
364 int cdb_store(int cdb,
365               void *ckey, int ckeylen,
366               void *cdata, int cdatalen)
367 {
368
369   DBT dkey, ddata;
370   DB_TXN *tid;
371   int ret;
372   
373   memset(&dkey, 0, sizeof(DBT));
374   memset(&ddata, 0, sizeof(DBT));
375   dkey.size = ckeylen;
376   dkey.data = ckey;
377   ddata.size = cdatalen;
378   ddata.data = cdata;
379   
380   if (MYTID != NULL)
381     {
382       ret = dbp[cdb]->put(dbp[cdb],             /* db */
383                           MYTID,                /* transaction ID */
384                           &dkey,                /* key */
385                           &ddata,               /* data */
386                           0);           /* flags */
387       if (ret)
388         {
389           lprintf(1, "cdb_store(%d): %s\n", cdb,
390                   db_strerror(ret));
391           abort();
392         }
393       return ret;
394       
395     }
396   else
397     {
398       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
399       
400     retry:
401       txbegin(&tid);
402       
403       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
404                                tid,         /* transaction ID */
405                                &dkey,       /* key */
406                                &ddata,      /* data */
407                                0)))         /* flags */
408         {
409           if (ret == DB_LOCK_DEADLOCK)
410             {
411               txabort(tid);
412               goto retry;
413             }
414           else
415             {
416               lprintf(1, "cdb_store(%d): %s\n", cdb,
417                       db_strerror(ret));
418               abort();
419             }
420         }
421       else
422         {
423           txcommit(tid);
424           return ret;
425         }
426     }
427 }
428
429
430 /*
431  * Delete a piece of data.  Returns 0 if the operation was successful.
432  */
433 int cdb_delete(int cdb, void *key, int keylen)
434 {
435
436   DBT dkey;
437   DB_TXN *tid;
438   int ret;
439
440   memset(&dkey, 0, sizeof dkey);
441   dkey.size = keylen;
442   dkey.data = key;
443
444   if (MYTID != NULL)
445     {
446       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
447       if (ret)
448         {
449           lprintf(1, "cdb_delete(%d): %s\n", cdb,
450                   db_strerror(ret));
451           if (ret != DB_NOTFOUND)
452             abort();
453         }
454     }
455   else
456     {
457       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
458     
459     retry:
460       txbegin(&tid);
461     
462       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
463           && ret != DB_NOTFOUND)
464         {
465           if (ret == DB_LOCK_DEADLOCK)
466             {
467               txabort(tid);
468               goto retry;
469             }
470           else
471             {
472               lprintf(1, "cdb_delete(%d): %s\n", cdb,
473                       db_strerror(ret));
474               abort();
475             }
476         }
477       else
478         {
479           txcommit(tid);
480         }
481     }
482   return ret;
483 }
484
485 static DBC *localcursor(int cdb)
486 {
487   int ret;
488   DBC *curs;
489
490   if (MYCURSORS[cdb] == NULL)
491     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
492   else
493     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
494
495   if (ret)
496     {
497       lprintf(1, "localcursor: %s\n", db_strerror(ret));
498       abort();
499     }
500
501   return curs;
502 }
503
504
505 /*
506  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
507  * a struct cdbdata which it is the caller's responsibility to free later on
508  * using the cdb_free() routine.
509  */
510 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
511 {
512
513   struct cdbdata *tempcdb;
514   DBT dkey, dret;
515   int ret;
516
517   memset(&dkey, 0, sizeof(DBT));
518   dkey.size = keylen;
519   dkey.data = key;
520
521   if (MYTID != NULL)
522     {
523       memset(&dret, 0, sizeof(DBT));
524       dret.flags = DB_DBT_MALLOC;
525       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
526     }
527   else
528     {
529       DBC *curs;
530
531       do
532         {
533           memset(&dret, 0, sizeof(DBT));
534           dret.flags = DB_DBT_MALLOC;
535
536           curs = localcursor(cdb);
537
538           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
539           cclose(curs);
540         }
541       while (ret == DB_LOCK_DEADLOCK);
542
543     }
544
545   if ((ret != 0) && (ret != DB_NOTFOUND))
546     {
547       lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
548       abort();
549     }
550
551   if (ret != 0) return NULL;
552   tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
553
554   if (tempcdb == NULL)
555     {
556       lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
557       abort();
558     }
559
560   tempcdb->len = dret.size;
561   tempcdb->ptr = dret.data;
562   return (tempcdb);
563 }
564
565
566 /*
567  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
568  * more complex stuff with other database managers in the future).
569  */
570 void cdb_free(struct cdbdata *cdb)
571 {
572         phree(cdb->ptr);
573         phree(cdb);
574 }
575
576 void cdb_close_cursor(int cdb)
577 {
578         if (MYCURSORS[cdb] != NULL)
579                 cclose(MYCURSORS[cdb]);
580
581         MYCURSORS[cdb] = NULL;
582 }
583
584 /* 
585  * Prepare for a sequential search of an entire database.
586  * (There is guaranteed to be no more than one traversal in
587  * progress per thread at any given time.)
588  */
589 void cdb_rewind(int cdb)
590 {
591         int ret = 0;
592
593         if (MYCURSORS[cdb] != NULL)
594                 cclose(MYCURSORS[cdb]);
595
596         /*
597          * Now initialize the cursor
598          */
599         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
600         if (ret) {
601                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
602                 abort();
603         }
604 }
605
606
607 /*
608  * Fetch the next item in a sequential search.  Returns a pointer to a 
609  * cdbdata structure, or NULL if we've hit the end.
610  */
611 struct cdbdata *cdb_next_item(int cdb)
612 {
613         DBT key, data;
614         struct cdbdata *cdbret;
615         int ret = 0;
616
617         /* Initialize the key/data pair so the flags aren't set. */
618         memset(&key, 0, sizeof(key));
619         memset(&data, 0, sizeof(data));
620         data.flags = DB_DBT_MALLOC;
621
622         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
623                 &key, &data, DB_NEXT);
624         
625         if (ret) {
626                 if (ret != DB_NOTFOUND) {
627                         lprintf(1, "cdb_next_item(%d): %s\n",
628                                 cdb, db_strerror(ret));
629                         abort();
630                 }
631                 cclose(MYCURSORS[cdb]);
632                 MYCURSORS[cdb] = NULL;
633                 return NULL;            /* presumably, end of file */
634         }
635
636         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
637         cdbret->len = data.size;
638         cdbret->ptr = data.data;
639
640         return (cdbret);
641 }
642
643
644 /*
645  * Transaction-based stuff.  I'm writing this as I bake cookies...
646  */
647
648 void cdb_begin_transaction(void) {
649
650   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
651
652   if (MYTID != NULL)
653     {
654       lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
655       abort();
656     }
657
658   txbegin(&MYTID);
659 }
660
661 void cdb_end_transaction(void) {
662   int i;
663
664   for (i = 0; i < MAXCDB; i++)
665     if (MYCURSORS[i] != NULL) {
666       lprintf(1, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
667       cclose(MYCURSORS[i]);
668       MYCURSORS[i] = NULL;
669     }
670
671   if (MYTID == NULL)
672     {
673       lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
674       abort();
675     }
676   else
677     txcommit(MYTID);
678
679   MYTID = NULL;
680 }
681