]> code.citadel.org Git - citadel.git/blobdiff - citadel/server/backends/berkeley_db/berkeley_db.c
bdb_decompress_if_necessary() now operates on DBT instead of cdbdata
[citadel.git] / citadel / server / backends / berkeley_db / berkeley_db.c
index b16c0e57d3f0ed73b1d1de0f2798265fbe097ebf..60063099e47cd3453a053b1fab5935ec563f0e03 100644 (file)
 #define MAX_CHECKPOINT_MINUTES 15
 
 #include "../../sysdep.h"
-#include <stdlib.h>
-#include <unistd.h>
 #include <sys/stat.h>
 #include <stdio.h>
-#include <dirent.h>
 #include <zlib.h>
 #include <db.h>
 #include <libcitadel.h>
-#include "../../ctdl_module.h"
-#include "../../control.h"
 #include "../../citserver.h"
 #include "../../config.h"
 #include "berkeley_db.h"
 #error Citadel requires Berkeley DB v18.0 or newer.  Please upgrade.
 #endif
 
+// Globals (these are used across multiple functions in *this* module, but not elsewhere)
+static DB *bdb_table[MAXCDB];  // One DB handle for each Citadel database
+static DB_ENV *bdb_env;                // The DB environment (global)
 
-static DB *dbp[MAXCDB];                // One DB handle for each Citadel database
-static DB_ENV *dbenv;          // The DB environment (global)
+
+// These are items that we need to store "per thread" rather than "per session".
+struct bdb_tsd {
+       DB_TXN *tid;            // Transaction handle
+       DBC *cursors[MAXCDB];   // Cursors, for traversals...
+       DBT dbkey[MAXCDB];
+       DBT dbdata[MAXCDB];
+};
+
+
+pthread_key_t bdb_thread_key;
+#define TSD bdb_get_tsd()
+
+
+// Some functions in this backend need to store some per-thread data.
+// This returns the pointer to the current thread's per-thread data block, creating it if necessary.
+struct bdb_tsd *bdb_get_tsd(void) {
+
+        struct bdb_tsd *c = (struct bdb_tsd *) pthread_getspecific(bdb_thread_key) ;
+       if (c != NULL) {
+               return(c);              // Got it.
+       }
+
+       // If there's no TSD for this thread, it must be a new thread.  Create our TSD region.
+       c = (struct bdb_tsd *) malloc(sizeof(struct bdb_tsd));
+       memset(c, 0, sizeof(struct bdb_tsd));
+       pthread_setspecific(bdb_thread_key, (const void *) c);
+       return(c);
+}
 
 
 // Called by other functions in this module to GTFO quickly if we need to.  Not part of the backend API.
@@ -46,7 +71,7 @@ void bdb_abort(void) {
 
 
 // Verbose logging callback for Berkeley DB.  Not part of the backend API.
-void bdb_verbose_log(const DB_ENV *dbenv, const char *msg, const char *foo) {
+void bdb_verbose_log(const DB_ENV *bdb_env, const char *msg, const char *foo) {
        if (!IsEmptyStr(msg)) {
                syslog(LOG_DEBUG, "bdb: %s %s", msg, foo);
        }
@@ -54,12 +79,12 @@ void bdb_verbose_log(const DB_ENV *dbenv, const char *msg, const char *foo) {
 
 
 // Verbose error logging callback for Berkeley DB.  Not part of the backend API.
-void bdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg) {
+void bdb_verbose_err(const DB_ENV *bdb_env, const char *errpfx, const char *msg) {
        syslog(LOG_ERR, "bdb: %s", msg);
 }
 
 
-// wrapper for txn_abort() that logs/aborts on error
+// Wrapper for txn_abort() that logs/aborts on error.  Not part of the backend API.
 static void bdb_txabort(DB_TXN *tid) {
        int ret;
 
@@ -72,7 +97,7 @@ static void bdb_txabort(DB_TXN *tid) {
 }
 
 
-// wrapper for txn_commit() that logs/aborts on error
+// Wrapper for txn_commit() that logs/aborts on error.  Not part of the backend API.
 static void bdb_txcommit(DB_TXN *tid) {
        int ret;
 
@@ -85,11 +110,11 @@ static void bdb_txcommit(DB_TXN *tid) {
 }
 
 
-// wrapper for txn_begin() that logs/aborts on error
+// Wrapper for txn_begin() that logs/aborts on error.  Not part of the backend API.
 static void bdb_txbegin(DB_TXN **tid) {
        int ret;
 
-       ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
+       ret = bdb_env->txn_begin(bdb_env, NULL, tid, 0);
 
        if (ret) {
                syslog(LOG_ERR, "bdb: txn_begin: %s", db_strerror(ret));
@@ -98,13 +123,14 @@ static void bdb_txbegin(DB_TXN **tid) {
 }
 
 
-// panic callback
+// Panic callback for Berkeley DB.  Not part of the backend API.
 static void bdb_dbpanic(DB_ENV *env, int errval) {
        syslog(LOG_ERR, "bdb: PANIC: %s", db_strerror(errval));
        bdb_abort();
 }
 
 
+// Close a cursor -- not part of the backend API.
 static void bdb_cclose(DBC *cursor) {
        int ret;
 
@@ -115,6 +141,8 @@ static void bdb_cclose(DBC *cursor) {
 }
 
 
+// Convenience function to abort if a cursor is still open when we didn't expect it to be.
+// This is not part of the backend API.
 static void bdb_bailIfCursor(DBC **cursors, const char *msg) {
        int i;
 
@@ -141,7 +169,7 @@ void bdb_checkpoint(void) {
        int ret;
 
        syslog(LOG_DEBUG, "bdb: -- checkpoint --");
-       ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
+       ret = bdb_env->txn_checkpoint(bdb_env, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
 
        if (ret != 0) {
                syslog(LOG_ERR, "bdb: bdb_checkpoint() txn_checkpoint: %s", db_strerror(ret));
@@ -150,15 +178,15 @@ void bdb_checkpoint(void) {
 
        // After a successful checkpoint, we can cull the unused logs
        if (CtdlGetConfigInt("c_auto_cull")) {
-               ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 1);
+               ret = bdb_env->log_set_config(bdb_env, DB_LOG_AUTO_REMOVE, 1);
        }
        else {
-               ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 0);
+               ret = bdb_env->log_set_config(bdb_env, DB_LOG_AUTO_REMOVE, 0);
        }
 }
 
 
-// Open the various databases we'll be using.  Any database which
+// Open the various tables we'll be using.  Any table which
 // does not exist should be created.  Note that we don't need a
 // critical section here, because there aren't any active threads
 // manipulating the database yet.
@@ -184,75 +212,62 @@ void bdb_open_databases(void) {
                exit(CTDLEXIT_DB);
        }
 
-       // Silently try to create the database subdirectory.  If it's already there, no problem.
-       if ((mkdir(ctdl_db_dir, 0700) != 0) && (errno != EEXIST)) {
-               syslog(LOG_ERR, "bdb: database directory [%s] does not exist and could not be created: %m", ctdl_db_dir);
-               exit(CTDLEXIT_DB);
-       }
-       if (chmod(ctdl_db_dir, 0700) != 0) {
-               syslog(LOG_ERR, "bdb: unable to set database directory permissions [%s]: %m", ctdl_db_dir);
-               exit(CTDLEXIT_DB);
-       }
-       if (chown(ctdl_db_dir, CTDLUID, (-1)) != 0) {
-               syslog(LOG_ERR, "bdb: unable to set the owner for [%s]: %m", ctdl_db_dir);
-               exit(CTDLEXIT_DB);
-       }
        syslog(LOG_DEBUG, "bdb: Setting up DB environment");
-       ret = db_env_create(&dbenv, 0);
+       ret = db_env_create(&bdb_env, 0);
        if (ret) {
                syslog(LOG_ERR, "bdb: db_env_create: %s", db_strerror(ret));
                syslog(LOG_ERR, "bdb: exit code %d", ret);
                exit(CTDLEXIT_DB);
        }
-       dbenv->set_errpfx(dbenv, "citserver");
-       dbenv->set_paniccall(dbenv, bdb_dbpanic);
-       dbenv->set_errcall(dbenv, bdb_verbose_err);
-       dbenv->set_msgcall(dbenv, bdb_verbose_log);
-       dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
-       dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
+       bdb_env->set_errpfx(bdb_env, "citserver");
+       bdb_env->set_paniccall(bdb_env, bdb_dbpanic);
+       bdb_env->set_errcall(bdb_env, bdb_verbose_err);
+       bdb_env->set_msgcall(bdb_env, bdb_verbose_log);
+       bdb_env->set_verbose(bdb_env, DB_VERB_DEADLOCK, 1);
+       bdb_env->set_verbose(bdb_env, DB_VERB_RECOVERY, 1);
 
        // We want to specify the shared memory buffer pool cachesize, but everything else is the default.
-       ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
+       ret = bdb_env->set_cachesize(bdb_env, 0, 64 * 1024, 0);
        if (ret) {
                syslog(LOG_ERR, "bdb: set_cachesize: %s", db_strerror(ret));
-               dbenv->close(dbenv, 0);
+               bdb_env->close(bdb_env, 0);
                syslog(LOG_ERR, "bdb: exit code %d", ret);
                exit(CTDLEXIT_DB);
        }
 
-       if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
+       if ((ret = bdb_env->set_lk_detect(bdb_env, DB_LOCK_DEFAULT))) {
                syslog(LOG_ERR, "bdb: set_lk_detect: %s", db_strerror(ret));
-               dbenv->close(dbenv, 0);
+               bdb_env->close(bdb_env, 0);
                syslog(LOG_ERR, "bdb: exit code %d", ret);
                exit(CTDLEXIT_DB);
        }
 
        flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_INIT_LOG;
-       syslog(LOG_DEBUG, "bdb: dbenv->open(dbenv, %s, %d, 0)", ctdl_db_dir, flags);
-       ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                                // try opening the database cleanly
+       syslog(LOG_DEBUG, "bdb: bdb_env->open(bdb_env, %s, %d, 0)", ctdl_db_dir, flags);
+       ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                            // try opening the database cleanly
        if (ret == DB_RUNRECOVERY) {
-               syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
+               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
                syslog(LOG_ERR, "bdb: attempting recovery...");
                flags |= DB_RECOVER;
-               ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try recovery
+               ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                    // try recovery
        }
        if (ret == DB_RUNRECOVERY) {
-               syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
+               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
                syslog(LOG_ERR, "bdb: attempting catastrophic recovery...");
                flags &= ~DB_RECOVER;
                flags |= DB_RECOVER_FATAL;
-               ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try catastrophic recovery
+               ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                    // try catastrophic recovery
        }
        if (ret) {
-               syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
-               dbenv->close(dbenv, 0);
+               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
+               bdb_env->close(bdb_env, 0);
                syslog(LOG_ERR, "bdb: exit code %d", ret);
                exit(CTDLEXIT_DB);
        }
 
        syslog(LOG_INFO, "bdb: mounting databases");
        for (i = 0; i < MAXCDB; ++i) {
-               ret = db_create(&dbp[i], dbenv, 0);                                     // Create a database handle
+               ret = db_create(&bdb_table[i], bdb_env, 0);                             // Create a database handle
                if (ret) {
                        syslog(LOG_ERR, "bdb: db_create: %s", db_strerror(ret));
                        syslog(LOG_ERR, "bdb: exit code %d", ret);
@@ -260,7 +275,7 @@ void bdb_open_databases(void) {
                }
 
                snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);                 // table names by number
-               ret = dbp[i]->open(dbp[i], NULL, dbfilename, NULL, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0600);
+               ret = bdb_table[i]->open(bdb_table[i], NULL, dbfilename, NULL, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0600);
                if (ret) {
                        syslog(LOG_ERR, "bdb: db_open[%02x]: %s", i, db_strerror(ret));
                        if (ret == ENOMEM) {
@@ -273,26 +288,6 @@ void bdb_open_databases(void) {
 }
 
 
-// Make sure we own all the files, because in a few milliseconds we're going to drop root privs.
-void bdb_chmod_data(void) {
-       DIR *dp;
-       struct dirent *d;
-       char filename[PATH_MAX];
-
-       dp = opendir(ctdl_db_dir);
-       if (dp != NULL) {
-               while (d = readdir(dp), d != NULL) {
-                       if (d->d_name[0] != '.') {
-                               snprintf(filename, sizeof filename, "%s/%s", ctdl_db_dir, d->d_name);
-                               syslog(LOG_DEBUG, "bdb: chmod(%s, 0600) returned %d", filename, chmod(filename, 0600));
-                               syslog(LOG_DEBUG, "bdb: chown(%s, CTDLUID, -1) returned %d", filename, chown(filename, CTDLUID, (-1)));
-                       }
-               }
-               closedir(dp);
-       }
-}
-
-
 // Close all of the db database files we've opened.  This can be done in a loop, since it's just a bunch of closes.
 void bdb_close_databases(void) {
        int i;
@@ -305,12 +300,12 @@ void bdb_close_databases(void) {
        closing = 1;
 
        syslog(LOG_INFO, "bdb: performing final checkpoint");
-       if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
+       if ((ret = bdb_env->txn_checkpoint(bdb_env, 0, 0, 0))) {
                syslog(LOG_ERR, "bdb: txn_checkpoint: %s", db_strerror(ret));
        }
 
        syslog(LOG_INFO, "bdb: flushing the database logs");
-       if ((ret = dbenv->log_flush(dbenv, NULL))) {
+       if ((ret = bdb_env->log_flush(bdb_env, NULL))) {
                syslog(LOG_ERR, "bdb: log_flush: %s", db_strerror(ret));
        }
 
@@ -318,25 +313,25 @@ void bdb_close_databases(void) {
        syslog(LOG_INFO, "bdb: closing databases");
        for (i = 0; i < MAXCDB; ++i) {
                syslog(LOG_INFO, "bdb: closing database %02x", i);
-               ret = dbp[i]->close(dbp[i], 0);
+               ret = bdb_table[i]->close(bdb_table[i], 0);
                if (ret) {
                        syslog(LOG_ERR, "bdb: db_close: %s", db_strerror(ret));
                }
        }
 
        // Close the handle.
-       ret = dbenv->close(dbenv, DB_FORCESYNC);
+       ret = bdb_env->close(bdb_env, DB_FORCESYNC);
        if (ret) {
                syslog(LOG_ERR, "bdb: DBENV->close: %s", db_strerror(ret));
        }
 }
 
 
-// Decompress a database item if it was compressed on disk
-void bdb_decompress_if_necessary(struct cdbdata *cdb) {
+// Decompress a DBT data block if it was compressed on disk.
+void bdb_decompress_if_necessary(DBT *d) {
        static int magic = COMPRESS_MAGIC;
 
-       if ((cdb == NULL) || (cdb->ptr == NULL) || (cdb->len < sizeof(magic)) || (memcmp(cdb->ptr, &magic, sizeof(magic)))) {
+       if ((d == NULL) || (d->data == NULL) || (d->size < sizeof(magic)) || (memcmp(d->data, &magic, sizeof(magic)))) {
                return;
        }
 
@@ -350,12 +345,12 @@ void bdb_decompress_if_necessary(struct cdbdata *cdb) {
 
        memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
        cplen = sizeof(struct CtdlCompressHeader);
-       if (sizeof(struct CtdlCompressHeader) > cdb->len) {
-               cplen = cdb->len;
+       if (sizeof(struct CtdlCompressHeader) > d->size) {
+               cplen = d->size;
        }
-       memcpy(&zheader, cdb->ptr, cplen);
+       memcpy(&zheader, d->data, cplen);
 
-       compressed_data = cdb->ptr;
+       compressed_data = d->data;
        compressed_data += sizeof(struct CtdlCompressHeader);
 
        sourceLen = (uLongf) zheader.compressed_len;
@@ -368,9 +363,9 @@ void bdb_decompress_if_necessary(struct cdbdata *cdb) {
                bdb_abort();
        }
 
-       free(cdb->ptr);
-       cdb->len = (size_t) destLen;
-       cdb->ptr = uncompressed_data;
+       free(d->data);
+       d->size = (size_t) destLen;
+       d->data = uncompressed_data;
 }
 
 
@@ -414,11 +409,11 @@ int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
        }
 
        if (TSD->tid != NULL) {
-               ret = dbp[cdb]->put(dbp[cdb],   // db
-                                   TSD->tid,   // transaction ID
-                                   &dkey,      // key
-                                   &ddata,     // data
-                                   0           // flags
+               ret = bdb_table[cdb]->put(bdb_table[cdb],       // db
+                                   TSD->tid,                   // transaction ID
+                                   &dkey,                      // key
+                                   &ddata,                     // data
+                                   0                           // flags
                );
                if (ret) {
                        syslog(LOG_ERR, "bdb: bdb_store(%d): %s", cdb, db_strerror(ret));
@@ -435,11 +430,11 @@ int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
              retry:
                bdb_txbegin(&tid);
 
-               if ((ret = dbp[cdb]->put(dbp[cdb],      // db
-                                        tid,           // transaction ID
-                                        &dkey,         // key
-                                        &ddata,        // data
-                                        0))) {         // flags
+               if ((ret = bdb_table[cdb]->put(bdb_table[cdb],  // db
+                                        tid,                   // transaction ID
+                                        &dkey,                 // key
+                                        &ddata,                // data
+                                        0))) {                 // flags
                        if (ret == DB_LOCK_DEADLOCK) {
                                bdb_txabort(tid);
                                goto retry;
@@ -472,7 +467,7 @@ int bdb_delete(int cdb, void *key, int keylen) {
        dkey.data = key;
 
        if (TSD->tid != NULL) {
-               ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0);
+               ret = bdb_table[cdb]->del(bdb_table[cdb], TSD->tid, &dkey, 0);
                if (ret) {
                        syslog(LOG_ERR, "bdb: bdb_delete(%d): %s", cdb, db_strerror(ret));
                        if (ret != DB_NOTFOUND) {
@@ -486,7 +481,7 @@ int bdb_delete(int cdb, void *key, int keylen) {
              retry:
                bdb_txbegin(&tid);
 
-               if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0)) && ret != DB_NOTFOUND) {
+               if ((ret = bdb_table[cdb]->del(bdb_table[cdb], tid, &dkey, 0)) && ret != DB_NOTFOUND) {
                        if (ret == DB_LOCK_DEADLOCK) {
                                bdb_txabort(tid);
                                goto retry;
@@ -509,7 +504,7 @@ static DBC *bdb_localcursor(int cdb) {
        DBC *curs;
 
        if (TSD->cursors[cdb] == NULL) {
-               ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0);
+               ret = bdb_table[cdb]->cursor(bdb_table[cdb], TSD->tid, &curs, 0);
        }
        else {
                ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
@@ -534,7 +529,7 @@ struct cdbdata *bdb_fetch(int cdb, const void *key, int keylen) {
        }
 
        struct cdbdata *tempcdb;
-       DBT dkey, dret;
+       DBT dkey;
        int ret;
 
        memset(&dkey, 0, sizeof(DBT));
@@ -542,18 +537,16 @@ struct cdbdata *bdb_fetch(int cdb, const void *key, int keylen) {
        dkey.data = (void *) key;
 
        if (TSD->tid != NULL) {
-               memset(&dret, 0, sizeof(DBT));
-               dret.flags = DB_DBT_MALLOC;
-               ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0);
+               TSD->dbdata[cdb].flags = DB_DBT_REALLOC;
+               ret = bdb_table[cdb]->get(bdb_table[cdb], TSD->tid, &dkey, &TSD->dbdata[cdb], 0);
        }
        else {
                DBC *curs;
 
                do {
-                       memset(&dret, 0, sizeof(DBT));
-                       dret.flags = DB_DBT_MALLOC;
+                       TSD->dbdata[cdb].flags = DB_DBT_REALLOC;
                        curs = bdb_localcursor(cdb);
-                       ret = curs->c_get(curs, &dkey, &dret, DB_SET);
+                       ret = curs->c_get(curs, &dkey, &TSD->dbdata[cdb], DB_SET);
                        bdb_cclose(curs);
                } while (ret == DB_LOCK_DEADLOCK);
        }
@@ -567,30 +560,23 @@ struct cdbdata *bdb_fetch(int cdb, const void *key, int keylen) {
                return NULL;
        }
 
+       bdb_decompress_if_necessary(&TSD->dbdata[cdb]);
+
        tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
        if (tempcdb == NULL) {
                syslog(LOG_ERR, "bdb: bdb_fetch() cannot allocate memory for tempcdb: %m");
                bdb_abort();
        }
        else {
-               tempcdb->len = dret.size;
-               tempcdb->ptr = dret.data;
-               bdb_decompress_if_necessary(tempcdb);
+               tempcdb->len = TSD->dbdata[cdb].size;
+               tempcdb->ptr = TSD->dbdata[cdb].data;
                return (tempcdb);
        }
 }
 
 
 // Free a cdbdata item.
-//
-// Note that we only free the 'ptr' portion if it is not NULL.  This allows
-// other code to assume ownership of that memory simply by storing the
-// pointer elsewhere and then setting 'ptr' to NULL.  bdb_free() will then
-// avoid freeing it.
 void bdb_free(struct cdbdata *cdb) {
-       if (cdb->ptr) {
-               free(cdb->ptr);
-       }
        free(cdb);
 }
 
@@ -605,8 +591,7 @@ void bdb_close_cursor(int cdb) {
 
 
 // Prepare for a sequential search of an entire database.
-// (There is guaranteed to be no more than one traversal in
-// progress per thread at any given time.)
+// (There is guaranteed to be no more than one traversal in progress per thread at any given time.)
 void bdb_rewind(int cdb) {
        int ret = 0;
 
@@ -617,7 +602,7 @@ void bdb_rewind(int cdb) {
        }
 
        // Now initialize the cursor
-       ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0);
+       ret = bdb_table[cdb]->cursor(bdb_table[cdb], TSD->tid, &TSD->cursors[cdb], 0);
        if (ret) {
                syslog(LOG_ERR, "bdb: bdb_rewind: db_cursor: %s", db_strerror(ret));
                bdb_abort();
@@ -628,16 +613,14 @@ void bdb_rewind(int cdb) {
 // Fetch the next item in a sequential search.  Returns a pointer to a 
 // cdbdata structure, or NULL if we've hit the end.
 struct cdbdata *bdb_next_item(int cdb) {
-       DBT key, data;
        struct cdbdata *cdbret;
        int ret = 0;
 
-       // Initialize the key/data pair so the flags aren't set.
-       memset(&key, 0, sizeof(key));
-       memset(&data, 0, sizeof(data));
-       data.flags = DB_DBT_MALLOC;
+       // reuse memory from the previous call.
+       TSD->dbkey[cdb].flags = DB_DBT_MALLOC;
+       TSD->dbdata[cdb].flags = DB_DBT_MALLOC;
 
-       ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT);
+       ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &TSD->dbkey[cdb], &TSD->dbdata[cdb], DB_NEXT);
 
        if (ret) {
                if (ret != DB_NOTFOUND) {
@@ -645,13 +628,14 @@ struct cdbdata *bdb_next_item(int cdb) {
                        bdb_abort();
                }
                bdb_close_cursor(cdb);
-               return NULL;    // presumably, end of file
+               return NULL;    // presumably, we are at the end
        }
 
+       bdb_decompress_if_necessary(&TSD->dbdata[cdb]);
+
        cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
-       cdbret->len = data.size;
-       cdbret->ptr = data.data;
-       bdb_decompress_if_necessary(cdbret);
+       cdbret->len = TSD->dbdata[cdb].size;
+       cdbret->ptr = TSD->dbdata[cdb].data;
 
        return (cdbret);
 }
@@ -707,10 +691,10 @@ void bdb_trunc(int cdb) {
 
              retry:
 
-               if ((ret = dbp[cdb]->truncate(dbp[cdb], // db
-                                             NULL,     // transaction ID
-                                             &count,   // #rows deleted
-                                             0))) {    // flags
+               if ((ret = bdb_table[cdb]->truncate(bdb_table[cdb],     // db
+                                             NULL,                     // transaction ID
+                                             &count,                   // #rows deleted
+                                             0))) {                    // flags
                        if (ret == DB_LOCK_DEADLOCK) {
                                goto retry;
                        }
@@ -734,7 +718,7 @@ void bdb_compact(void) {
        syslog(LOG_DEBUG, "bdb: bdb_compact() started");
        for (i = 0; i < MAXCDB; i++) {
                syslog(LOG_DEBUG, "bdb: compacting database %d", i);
-               ret = dbp[i]->compact(dbp[i], NULL, NULL, NULL, NULL, DB_FREE_SPACE, NULL);
+               ret = bdb_table[i]->compact(bdb_table[i], NULL, NULL, NULL, NULL, DB_FREE_SPACE, NULL);
                if (ret) {
                        syslog(LOG_ERR, "bdb: compact: %s", db_strerror(ret));
                }
@@ -745,6 +729,8 @@ void bdb_compact(void) {
 
 // Calling this function activates the Berkeley DB back end.
 void bdb_init_backend(void) {
+
+       // Assign the backend API stubs to the functions in this module.
        cdb_compact = bdb_compact;
        cdb_checkpoint = bdb_checkpoint;
        cdb_rewind = bdb_rewind;
@@ -760,9 +746,13 @@ void bdb_init_backend(void) {
        cdb_end_transaction = bdb_end_transaction;
        cdb_check_handles = bdb_check_handles;
        cdb_trunc = bdb_trunc;
-       cdb_chmod_data = bdb_chmod_data;
+
+       // Some functions in this backend need to store some per-thread data.
+       // We crerate the key here, during module initialization.
+       if (pthread_key_create(&bdb_thread_key, NULL) != 0) {
+               syslog(LOG_ERR, "pthread_key_create() : %m");
+               exit(CTDLEXIT_THREAD);
+       }
 
        syslog(LOG_INFO, "db: initialized Berkeley DB backend");
 }
-
-