4 * This is a data store backend for the Citadel server which uses Berkeley DB.
8 /*****************************************************************************
9 Tunable configuration parameters for the Berkeley DB back end
10 *****************************************************************************/
12 /* Citadel will checkpoint the db at the end of every session, but only if
13 * the specified number of kilobytes has been written, or if the specified
14 * number of minutes has passed, since the last checkpoint.
16 #define MAX_CHECKPOINT_KBYTES 256
17 #define MAX_CHECKPOINT_MINUTES 15
19 /*****************************************************************************/
28 #include <sys/types.h>
34 #elif defined(HAVE_DB4_DB_H)
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer. Please upgrade.
46 #include <libcitadel.h>
49 #include "citserver.h"
52 #include "sysdep_decls.h"
57 #include "ctdl_module.h"
60 static DB *dbp[MAXCDB]; /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv; /* The DB environment (global) */
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
72 if (!IsEmptyStr(msg)) {
73 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
81 CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
90 ret = tid->abort(tid);
93 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n",
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
104 ret = tid->commit(tid, 0);
107 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
112 /* are you sensing a pattern yet? */
113 static void txbegin(DB_TXN ** tid)
117 ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
120 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
125 static void dbpanic(DB_ENV * env, int errval)
127 CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
130 static void cclose(DBC * cursor)
134 if ((ret = cursor->c_close(cursor))) {
135 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
140 static void bailIfCursor(DBC ** cursors, const char *msg)
144 for (i = 0; i < MAXCDB; i++)
145 if (cursors[i] != NULL) {
146 CtdlLogPrintf(CTDL_EMERG,
147 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
152 void check_handles(void *arg)
155 ThreadTSD *tsd = (ThreadTSD *) arg;
157 bailIfCursor(tsd->cursors, "in check_handles");
159 if (tsd->tid != NULL) {
160 CtdlLogPrintf(CTDL_EMERG,
161 "bdb(): transaction still in progress!");
167 void cdb_check_handles(void)
169 check_handles(pthread_getspecific(ThreadKey));
174 * Cull the database logs
176 static void cdb_cull_logs(void)
185 /* Get the list of names. */
186 if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
187 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
191 /* Print the list of names. */
193 for (file = list; *file != NULL; ++file) {
194 CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
197 snprintf(errmsg, sizeof(errmsg),
198 " ** ERROR **\n \n \n "
199 "Citadel was unable to delete the "
200 "database log file '%s' because of the "
201 "following error:\n \n %s\n \n"
202 " This log file is no longer in use "
203 "and may be safely deleted.\n",
204 *file, strerror(errno));
205 aide_message(errmsg, "Database Warning Message");
213 * Manually initiate log file cull.
215 void cmd_cull(char *argbuf) {
216 if (CtdlAccessCheck(ac_internal)) return;
218 cprintf("%d Database log file cull completed.\n", CIT_OK);
223 * Request a checkpoint of the database. Called once per minute by the thread manager.
225 void cdb_checkpoint(void)
229 CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
230 ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
233 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
237 /* After a successful checkpoint, we can cull the unused logs */
238 if (config.c_auto_cull) {
246 * Open the various databases we'll be using. Any database which
247 * does not exist should be created. Note that we don't need a
248 * critical section here, because there aren't any active threads
249 * manipulating the database yet.
251 void open_databases(void)
257 int dbversion_major, dbversion_minor, dbversion_patch;
258 int current_dbversion = 0;
260 CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
261 CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
262 CtdlLogPrintf(CTDL_INFO, " Linked db: %s\n",
263 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
265 current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
267 CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
268 CtdlLogPrintf(CTDL_DEBUG, " Previous dbversion: %d\n", CitControl.MMdbversion);
270 if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
271 && (CitControl.MMdbversion > current_dbversion) ) {
272 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
273 "of Berkeley DB that is older than that which last created or\n"
274 "updated the database. Because this would probably cause data\n"
275 "corruption or loss, the server is aborting execution now.\n");
279 CitControl.MMdbversion = current_dbversion;
283 CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
287 * Silently try to create the database subdirectory. If it's
288 * already there, no problem.
290 mkdir(ctdl_data_dir, 0700);
291 chmod(ctdl_data_dir, 0700);
292 chown(ctdl_data_dir, CTDLUID, (-1));
294 CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
295 db_env_set_func_yield(sched_yield);
296 ret = db_env_create(&dbenv, 0);
298 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
299 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
302 dbenv->set_errpfx(dbenv, "citserver");
303 dbenv->set_paniccall(dbenv, dbpanic);
304 dbenv->set_errcall(dbenv, cdb_verbose_err);
305 dbenv->set_errpfx(dbenv, "ctdl");
306 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
307 dbenv->set_msgcall(dbenv, cdb_verbose_log);
309 dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
310 dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
313 * We want to specify the shared memory buffer pool cachesize,
314 * but everything else is the default.
316 ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
318 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
319 dbenv->close(dbenv, 0);
320 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
324 if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
325 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
326 dbenv->close(dbenv, 0);
327 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
331 flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
332 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
333 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
334 if (ret == DB_RUNRECOVERY) {
335 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
336 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
338 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
340 if (ret == DB_RUNRECOVERY) {
341 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
342 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
343 flags &= ~DB_RECOVER;
344 flags |= DB_RECOVER_FATAL;
345 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
348 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
349 dbenv->close(dbenv, 0);
350 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
354 CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
356 for (i = 0; i < MAXCDB; ++i) {
358 /* Create a database handle */
359 ret = db_create(&dbp[i], dbenv, 0);
361 CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
362 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
367 /* Arbitrary names for our tables -- we reference them by
368 * number, so we don't have string names for them.
370 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
372 ret = dbp[i]->open(dbp[i],
377 DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
380 CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
382 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
384 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
392 /* Make sure we own all the files, because in a few milliseconds
393 * we're going to drop root privs.
395 void cdb_chmod_data(void) {
398 char filename[PATH_MAX];
400 dp = opendir(ctdl_data_dir);
402 while (d = readdir(dp), d != NULL) {
403 if (d->d_name[0] != '.') {
404 snprintf(filename, sizeof filename,
405 "%s/%s", ctdl_data_dir, d->d_name);
406 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
407 filename, chmod(filename, 0600)
409 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
410 filename, chown(filename, CTDLUID, (-1))
417 CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
419 CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
424 * Close all of the db database files we've opened. This can be done
425 * in a loop, since it's just a bunch of closes.
427 void close_databases(void)
432 ctdl_thread_internal_free_tsd();
434 if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
435 CtdlLogPrintf(CTDL_EMERG,
436 "txn_checkpoint: %s\n", db_strerror(ret));
439 /* print some statistics... */
441 dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
444 /* close the tables */
445 for (a = 0; a < MAXCDB; ++a) {
446 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
447 ret = dbp[a]->close(dbp[a], 0);
449 CtdlLogPrintf(CTDL_EMERG,
450 "db_close: %s\n", db_strerror(ret));
455 /* Close the handle. */
456 ret = dbenv->close(dbenv, 0);
458 CtdlLogPrintf(CTDL_EMERG,
459 "DBENV->close: %s\n", db_strerror(ret));
465 * Compression functions only used if we have zlib
467 void cdb_decompress_if_necessary(struct cdbdata *cdb)
469 static int magic = COMPRESS_MAGIC;
473 if (cdb->ptr == NULL)
475 if (memcmp(cdb->ptr, &magic, sizeof(magic)))
479 /* At this point we know we're looking at a compressed item. */
481 struct CtdlCompressHeader zheader;
482 char *uncompressed_data;
483 char *compressed_data;
484 uLongf destLen, sourceLen;
486 memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
488 compressed_data = cdb->ptr;
489 compressed_data += sizeof(struct CtdlCompressHeader);
491 sourceLen = (uLongf) zheader.compressed_len;
492 destLen = (uLongf) zheader.uncompressed_len;
493 uncompressed_data = malloc(zheader.uncompressed_len);
495 if (uncompress((Bytef *) uncompressed_data,
496 (uLongf *) & destLen,
497 (const Bytef *) compressed_data,
498 (uLong) sourceLen) != Z_OK) {
499 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
504 cdb->len = (size_t) destLen;
505 cdb->ptr = uncompressed_data;
506 #else /* HAVE_ZLIB */
507 CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
509 #endif /* HAVE_ZLIB */
515 * Store a piece of data. Returns 0 if the operation was successful. If a
516 * key already exists it should be overwritten.
518 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
526 struct CtdlCompressHeader zheader;
527 char *compressed_data = NULL;
529 size_t buffer_len = 0;
533 memset(&dkey, 0, sizeof(DBT));
534 memset(&ddata, 0, sizeof(DBT));
537 ddata.size = cdatalen;
541 /* Only compress Visit records. Everything else is uncompressed. */
542 if (cdb == CDB_VISIT) {
544 zheader.magic = COMPRESS_MAGIC;
545 zheader.uncompressed_len = cdatalen;
546 buffer_len = ((cdatalen * 101) / 100) + 100
547 + sizeof(struct CtdlCompressHeader);
548 destLen = (uLongf) buffer_len;
549 compressed_data = malloc(buffer_len);
550 if (compress2((Bytef *) (compressed_data +
552 CtdlCompressHeader)),
553 &destLen, (Bytef *) cdata, (uLongf) cdatalen,
555 CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
558 zheader.compressed_len = (size_t) destLen;
559 memcpy(compressed_data, &zheader,
560 sizeof(struct CtdlCompressHeader));
561 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
562 zheader.compressed_len);
563 ddata.data = compressed_data;
568 ret = dbp[cdb]->put(dbp[cdb], /* db */
569 MYTID, /* transaction ID */
574 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
580 free(compressed_data);
585 bailIfCursor(MYCURSORS,
586 "attempt to write during r/o cursor");
591 if ((ret = dbp[cdb]->put(dbp[cdb], /* db */
592 tid, /* transaction ID */
596 if (ret == DB_LOCK_DEADLOCK) {
600 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
601 cdb, db_strerror(ret));
608 free(compressed_data);
617 * Delete a piece of data. Returns 0 if the operation was successful.
619 int cdb_delete(int cdb, void *key, int keylen)
626 memset(&dkey, 0, sizeof dkey);
631 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
633 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
635 if (ret != DB_NOTFOUND)
639 bailIfCursor(MYCURSORS,
640 "attempt to delete during r/o cursor");
645 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
646 && ret != DB_NOTFOUND) {
647 if (ret == DB_LOCK_DEADLOCK) {
651 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
652 cdb, db_strerror(ret));
662 static DBC *localcursor(int cdb)
667 if (MYCURSORS[cdb] == NULL)
668 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
671 MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
675 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
684 * Fetch a piece of data. If not found, returns NULL. Otherwise, it returns
685 * a struct cdbdata which it is the caller's responsibility to free later on
686 * using the cdb_free() routine.
688 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
691 struct cdbdata *tempcdb;
695 memset(&dkey, 0, sizeof(DBT));
700 memset(&dret, 0, sizeof(DBT));
701 dret.flags = DB_DBT_MALLOC;
702 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
707 memset(&dret, 0, sizeof(DBT));
708 dret.flags = DB_DBT_MALLOC;
710 curs = localcursor(cdb);
712 ret = curs->c_get(curs, &dkey, &dret, DB_SET);
715 while (ret == DB_LOCK_DEADLOCK);
719 if ((ret != 0) && (ret != DB_NOTFOUND)) {
720 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
727 tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
729 if (tempcdb == NULL) {
730 CtdlLogPrintf(CTDL_EMERG,
731 "cdb_fetch: Cannot allocate memory for tempcdb\n");
735 tempcdb->len = dret.size;
736 tempcdb->ptr = dret.data;
737 cdb_decompress_if_necessary(tempcdb);
743 * Free a cdbdata item.
745 * Note that we only free the 'ptr' portion if it is not NULL. This allows
746 * other code to assume ownership of that memory simply by storing the
747 * pointer elsewhere and then setting 'ptr' to NULL. cdb_free() will then
750 void cdb_free(struct cdbdata *cdb)
758 void cdb_close_cursor(int cdb)
760 if (MYCURSORS[cdb] != NULL)
761 cclose(MYCURSORS[cdb]);
763 MYCURSORS[cdb] = NULL;
767 * Prepare for a sequential search of an entire database.
768 * (There is guaranteed to be no more than one traversal in
769 * progress per thread at any given time.)
771 void cdb_rewind(int cdb)
775 if (MYCURSORS[cdb] != NULL) {
776 CtdlLogPrintf(CTDL_EMERG,
777 "cdb_rewind: must close cursor on database %d before reopening.\n",
780 /* cclose(MYCURSORS[cdb]); */
784 * Now initialize the cursor
786 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
788 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
796 * Fetch the next item in a sequential search. Returns a pointer to a
797 * cdbdata structure, or NULL if we've hit the end.
799 struct cdbdata *cdb_next_item(int cdb)
802 struct cdbdata *cdbret;
805 /* Initialize the key/data pair so the flags aren't set. */
806 memset(&key, 0, sizeof(key));
807 memset(&data, 0, sizeof(data));
808 data.flags = DB_DBT_MALLOC;
810 ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
813 if (ret != DB_NOTFOUND) {
814 CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
815 cdb, db_strerror(ret));
818 cclose(MYCURSORS[cdb]);
819 MYCURSORS[cdb] = NULL;
820 return NULL; /* presumably, end of file */
823 cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
824 cdbret->len = data.size;
825 cdbret->ptr = data.data;
826 cdb_decompress_if_necessary(cdbret);
834 * Transaction-based stuff. I'm writing this as I bake cookies...
837 void cdb_begin_transaction(void)
840 bailIfCursor(MYCURSORS,
841 "can't begin transaction during r/o cursor");
844 CtdlLogPrintf(CTDL_EMERG,
845 "cdb_begin_transaction: ERROR: nested transaction\n");
852 void cdb_end_transaction(void)
856 for (i = 0; i < MAXCDB; i++)
857 if (MYCURSORS[i] != NULL) {
858 CtdlLogPrintf(CTDL_WARNING,
859 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
861 cclose(MYCURSORS[i]);
866 CtdlLogPrintf(CTDL_EMERG,
867 "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
876 * Truncate (delete every record)
878 void cdb_trunc(int cdb)
885 CtdlLogPrintf(CTDL_EMERG,
886 "cdb_trunc must not be called in a transaction.\n");
889 bailIfCursor(MYCURSORS,
890 "attempt to write during r/o cursor");
895 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
896 NULL, /* transaction ID */
897 &count, /* #rows deleted */
899 if (ret == DB_LOCK_DEADLOCK) {
903 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
905 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");