X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fdatabase_sleepycat.c;h=0573ae5de032cfce2ac565ba0e96768a3459caf5;hb=11219dbc3e638e7ee47feffbbb7d63588d7dd77f;hp=dc77215e7a4e8e6f1ed6954db26ed4509ca06fc5;hpb=425f004d7eab12dcddb0a10771b3fbd2fb1a6db8;p=citadel.git diff --git a/citadel/database_sleepycat.c b/citadel/database_sleepycat.c index dc77215e7..0573ae5de 100644 --- a/citadel/database_sleepycat.c +++ b/citadel/database_sleepycat.c @@ -1,109 +1,406 @@ /* * $Id$ * - * Sleepycat (Berkeley) DB driver for Citadel/UX + * Sleepycat (Berkeley) DB driver for Citadel * */ +/***************************************************************************** + Tunable configuration parameters for the Sleepycat DB back end + *****************************************************************************/ + +/* Citadel will checkpoint the db at the end of every session, but only if + * the specified number of kilobytes has been written, or if the specified + * number of minutes has passed, since the last checkpoint. + */ +#define MAX_CHECKPOINT_KBYTES 256 +#define MAX_CHECKPOINT_MINUTES 15 + +/*****************************************************************************/ + #include "sysdep.h" #include #include #include -#include #include #include #include +#include +#include +#include + +#ifdef HAVE_DB_H #include +#elif defined(HAVE_DB4_DB_H) +#include +#else +#error Neither nor was found by configure. Install db4-devel. +#endif + + +#if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1 +#error Citadel requires Berkeley DB v4.1 or newer. Please upgrade. +#endif + + +#include #include "citadel.h" #include "server.h" +#include "serv_extensions.h" #include "citserver.h" #include "database.h" +#include "msgbase.h" #include "sysdep_decls.h" +#include "config.h" + +static DB *dbp[MAXCDB]; /* One DB handle for each Citadel database */ +static DB_ENV *dbenv; /* The DB environment (global) */ + +struct cdbtsd { /* Thread-specific DB stuff */ + DB_TXN *tid; /* Transaction handle */ + DBC *cursors[MAXCDB]; /* Cursors, for traversals... */ +}; + +#ifdef HAVE_ZLIB +#include +#endif + +static pthread_key_t tsdkey; + +#define MYCURSORS (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors) +#define MYTID (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid) + +/* just a little helper function */ +static void txabort(DB_TXN * tid) +{ + int ret; + + ret = tid->abort(tid); + + if (ret) { + lprintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n", + db_strerror(ret)); + abort(); + } +} + +/* this one is even more helpful than the last. */ +static void txcommit(DB_TXN * tid) +{ + int ret; + + ret = tid->commit(tid, 0); + + if (ret) { + lprintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n", + db_strerror(ret)); + abort(); + } +} + +/* are you sensing a pattern yet? */ +static void txbegin(DB_TXN ** tid) +{ + int ret; + + ret = dbenv->txn_begin(dbenv, NULL, tid, 0); + + if (ret) { + lprintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n", + db_strerror(ret)); + abort(); + } +} + +static void dbpanic(DB_ENV * env, int errval) +{ + lprintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval); +} + +static void cclose(DBC * cursor) +{ + int ret; + + if ((ret = cursor->c_close(cursor))) { + lprintf(CTDL_EMERG, "cdb_*: c_close: %s\n", + db_strerror(ret)); + abort(); + } +} + +static void bailIfCursor(DBC ** cursors, const char *msg) +{ + int i; + + for (i = 0; i < MAXCDB; i++) + if (cursors[i] != NULL) { + lprintf(CTDL_EMERG, + "cdb_*: cursor still in progress on cdb %d: %s\n", + i, msg); + abort(); + } +} + +static void check_handles(void *arg) +{ + if (arg != NULL) { + struct cdbtsd *tsd = (struct cdbtsd *) arg; + bailIfCursor(tsd->cursors, "in check_handles"); + + if (tsd->tid != NULL) { + lprintf(CTDL_EMERG, + "cdb_*: transaction still in progress!"); + abort(); + } + } +} + +static void dest_tsd(void *arg) +{ + if (arg != NULL) { + check_handles(arg); + free(arg); + } +} /* - * This array holds one DB handle for each Citadel database. + * Ensure that we have a key for thread-specific data. We don't + * put anything in here that Citadel cares about; this is just database + * related stuff like cursors and transactions. + * + * This should be called immediately after startup by any thread which wants + * to use database calls, except for whatever thread calls open_databases. */ -DB *dbp[MAXCDB]; +void cdb_allocate_tsd(void) +{ + struct cdbtsd *tsd; + + if (pthread_getspecific(tsdkey) != NULL) + return; + + tsd = malloc(sizeof(struct cdbtsd)); + + tsd->tid = NULL; + + memset(tsd->cursors, 0, sizeof tsd->cursors); + pthread_setspecific(tsdkey, tsd); +} -DB_ENV *dbenv; +void cdb_free_tsd(void) +{ + dest_tsd(pthread_getspecific(tsdkey)); + pthread_setspecific(tsdkey, NULL); +} + +void cdb_check_handles(void) +{ + check_handles(pthread_getspecific(tsdkey)); +} -DBC *cursorz[999]; /* FIXME !! */ -#define MYCURSOR cursorz[CC->cs_pid] /* * Reclaim unused space in the databases. We need to do each one of * these discretely, rather than in a loop. + * + * This is a stub function in the Sleepycat DB backend, because there is no + * such API call available. */ void defrag_databases(void) { - /* FIXME ... do we even need this? If not, we'll just keep it as - * a stub function to keep the API consistent. - */ + /* do nothing */ } +/* + * Cull the database logs + */ +static void cdb_cull_logs(void) +{ + u_int32_t flags; + int ret; + char **file, **list; + char errmsg[SIZ]; + + flags = DB_ARCH_ABS; + + /* Get the list of names. */ + if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) { + lprintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret)); + return; + } + + /* Print the list of names. */ + if (list != NULL) { + for (file = list; *file != NULL; ++file) { + lprintf(CTDL_DEBUG, "Deleting log: %s\n", *file); + ret = unlink(*file); + if (ret != 0) { + snprintf(errmsg, sizeof(errmsg), + " ** ERROR **\n \n \n " + "Citadel was unable to delete the " + "database log file '%s' because of the " + "following error:\n \n %s\n \n" + " This log file is no longer in use " + "and may be safely deleted.\n", + *file, strerror(errno)); + aide_message(errmsg, "Database Warning Message"); + } + } + free(list); + } +} + +/* + * Manually initiate log file cull. + */ +void cmd_cull(char *argbuf) { + if (CtdlAccessCheck(ac_internal)) return; + cdb_cull_logs(); + cprintf("%d Database log file cull completed.\n", CIT_OK); +} + + +/* + * Request a checkpoint of the database. + */ +static void cdb_checkpoint(void) +{ + int ret; + static time_t last_run = 0L; + + /* Only do a checkpoint once per minute. */ + if ((time(NULL) - last_run) < 60L) { + return; + } + last_run = time(NULL); + + lprintf(CTDL_DEBUG, "-- db checkpoint --\n"); + ret = dbenv->txn_checkpoint(dbenv, + MAX_CHECKPOINT_KBYTES, + MAX_CHECKPOINT_MINUTES, 0); + + if (ret != 0) { + lprintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", + db_strerror(ret)); + abort(); + } + + /* After a successful checkpoint, we can cull the unused logs */ + if (config.c_auto_cull) { + cdb_cull_logs(); + } +} + + +/* + * Main loop for the checkpoint thread. + */ +void *checkpoint_thread(void *arg) { + struct CitContext checkpointCC; + + lprintf(CTDL_DEBUG, "checkpoint_thread() initializing\n"); + + memset(&checkpointCC, 0, sizeof(struct CitContext)); + checkpointCC.internal_pgm = 1; + checkpointCC.cs_pid = 0; + pthread_setspecific(MyConKey, (void *)&checkpointCC ); + + cdb_allocate_tsd(); + + while (!time_to_die) { + cdb_checkpoint(); + sleep(1); + } + + lprintf(CTDL_DEBUG, "checkpoint_thread() exiting\n"); + pthread_exit(NULL); +} + /* * Open the various databases we'll be using. Any database which - * does not exist should be created. Note that we don't need an S_DATABASE - * critical section here, because there aren't any active threads manipulating - * the database yet -- and besides, it causes problems on BSDI. + * does not exist should be created. Note that we don't need a + * critical section here, because there aren't any active threads + * manipulating the database yet. */ void open_databases(void) { int ret; int i; - char dbfilename[256]; - - /* - * Silently try to create the database subdirectory. If it's - * already there, no problem. - */ - system("exec mkdir data 2>/dev/null"); + char dbfilename[SIZ]; + u_int32_t flags = 0; + DIR *dp; + struct dirent *d; + char filename[PATH_MAX]; + + lprintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n"); + lprintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING); + lprintf(CTDL_INFO, " Linked db: %s\n", + db_version(NULL, NULL, NULL)); +#ifdef HAVE_ZLIB + lprintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion()); +#endif + + /* + * Silently try to create the database subdirectory. If it's + * already there, no problem. + */ + mkdir(ctdl_data_dir, 0700); + chmod(ctdl_data_dir, 0700); + chown(ctdl_data_dir, CTDLUID, (-1)); - lprintf(9, "Setting up DB environment\n"); + lprintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n"); + db_env_set_func_yield(sched_yield); ret = db_env_create(&dbenv, 0); if (ret) { - lprintf(1, "db_env_create: %s\n", db_strerror(ret)); + lprintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n", + db_strerror(ret)); exit(ret); } - dbenv->set_errfile(dbenv, stderr); /* FIXME */ dbenv->set_errpfx(dbenv, "citserver"); + dbenv->set_paniccall(dbenv, dbpanic); - /* - * We want to specify the shared memory buffer pool cachesize, - * but everything else is the default. - */ - ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0); + /* + * We want to specify the shared memory buffer pool cachesize, + * but everything else is the default. + */ + ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0); if (ret) { - lprintf(1, "set_cachesize: %s\n", db_strerror(ret)); - dbenv->close(dbenv, 0); - exit(ret); - } - - /* - * We have multiple processes reading/writing these files, so - * we need concurrency control and a shared buffer pool, but - * not logging or transactions. - */ - /* (void)dbenv->set_data_dir(dbenv, "/database/files"); */ - ret = dbenv->open(dbenv, "./data", - ( DB_CREATE | DB_INIT_MPOOL ), - 0); + lprintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n", + db_strerror(ret)); + dbenv->close(dbenv, 0); + exit(ret); + } + + if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) { + lprintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n", + db_strerror(ret)); + dbenv->close(dbenv, 0); + exit(ret); + } + + flags = + DB_CREATE | DB_RECOVER | DB_INIT_MPOOL | DB_PRIVATE | + DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD; + lprintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, + flags); + ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0); if (ret) { - lprintf(1, "dbenv->open: %s\n", db_strerror(ret)); - dbenv->close(dbenv, 0); - exit(ret); - } + lprintf(CTDL_DEBUG, "cdb_*: dbenv->open: %s\n", + db_strerror(ret)); + dbenv->close(dbenv, 0); + exit(ret); + } - lprintf(7, "Starting up DB\n"); + lprintf(CTDL_INFO, "cdb_*: Starting up DB\n"); for (i = 0; i < MAXCDB; ++i) { /* Create a database handle */ ret = db_create(&dbp[i], dbenv, 0); if (ret) { - lprintf(1, "db_create: %s\n", db_strerror(ret)); + lprintf(CTDL_DEBUG, "cdb_*: db_create: %s\n", + db_strerror(ret)); exit(ret); } @@ -111,26 +408,54 @@ void open_databases(void) /* Arbitrary names for our tables -- we reference them by * number, so we don't have string names for them. */ - sprintf(dbfilename, "cdb.%02x", i); + snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i); ret = dbp[i]->open(dbp[i], - dbfilename, - NULL, - DB_BTREE, - DB_CREATE, - 0600); + NULL, + dbfilename, + NULL, + DB_BTREE, + DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, + 0600); if (ret) { - lprintf(1, "db_open[%d]: %s\n", i, db_strerror(ret)); + lprintf(CTDL_EMERG, "cdb_*: db_open[%d]: %s\n", i, + db_strerror(ret)); exit(ret); } + } + + if ((ret = pthread_key_create(&tsdkey, dest_tsd))) { + lprintf(CTDL_EMERG, "cdb_*: pthread_key_create: %s\n", + strerror(ret)); + exit(1); + } + + cdb_allocate_tsd(); + /* Now make sure we own all the files, because in a few milliseconds + * we're going to drop root privs. + */ + dp = opendir(ctdl_data_dir); + if (dp != NULL) { + while (d = readdir(dp), d != NULL) { + if (d->d_name[0] != '.') { + snprintf(filename, sizeof filename, + "%s/%s", ctdl_data_dir, d->d_name); + chmod(filename, 0600); + chown(filename, CTDLUID, (-1)); + } + } + closedir(dp); } + lprintf(CTDL_DEBUG, "cdb_*: open_databases() finished\n"); + + CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs"); } /* - * Close all of the gdbm database files we've opened. This can be done + * Close all of the db database files we've opened. This can be done * in a loop, since it's just a bunch of closes. */ void close_databases(void) @@ -138,42 +463,97 @@ void close_databases(void) int a; int ret; - begin_critical_section(S_DATABASE); + cdb_free_tsd(); + + if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) { + lprintf(CTDL_EMERG, + "cdb_*: txn_checkpoint: %s\n", db_strerror(ret)); + } + for (a = 0; a < MAXCDB; ++a) { - lprintf(7, "Closing database %d\n", a); + lprintf(CTDL_INFO, "cdb_*: Closing database %d\n", a); ret = dbp[a]->close(dbp[a], 0); if (ret) { - lprintf(1, "db_close: %s\n", db_strerror(ret)); + lprintf(CTDL_EMERG, + "cdb_*: db_close: %s\n", db_strerror(ret)); } - - } - + } - /* Close the handle. */ - ret = dbenv->close(dbenv, 0); + /* Close the handle. */ + ret = dbenv->close(dbenv, 0); if (ret) { - lprintf(1, "DBENV->close: %s\n", db_strerror(ret)); - } + lprintf(CTDL_EMERG, + "cdb_*: DBENV->close: %s\n", db_strerror(ret)); + } +} + +/* + * Compression functions only used if we have zlib + */ +#ifdef HAVE_ZLIB - end_critical_section(S_DATABASE); +void cdb_decompress_if_necessary(struct cdbdata *cdb) +{ + static int magic = COMPRESS_MAGIC; + struct CtdlCompressHeader zheader; + char *uncompressed_data; + char *compressed_data; + uLongf destLen, sourceLen; + + if (cdb == NULL) + return; + if (cdb->ptr == NULL) + return; + if (memcmp(cdb->ptr, &magic, sizeof(magic))) + return; + + /* At this point we know we're looking at a compressed item. */ + memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader)); + + compressed_data = cdb->ptr; + compressed_data += sizeof(struct CtdlCompressHeader); + + sourceLen = (uLongf) zheader.compressed_len; + destLen = (uLongf) zheader.uncompressed_len; + uncompressed_data = malloc(zheader.uncompressed_len); + + if (uncompress((Bytef *) uncompressed_data, + (uLongf *) & destLen, + (const Bytef *) compressed_data, + (uLong) sourceLen) != Z_OK) { + lprintf(CTDL_EMERG, "uncompress() error\n"); + abort(); + } + free(cdb->ptr); + cdb->len = (size_t) destLen; + cdb->ptr = uncompressed_data; } +#endif /* HAVE_ZLIB */ + /* * Store a piece of data. Returns 0 if the operation was successful. If a * key already exists it should be overwritten. */ -int cdb_store(int cdb, - void *ckey, int ckeylen, - void *cdata, int cdatalen) +int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen) { DBT dkey, ddata; + DB_TXN *tid; int ret; +#ifdef HAVE_ZLIB + struct CtdlCompressHeader zheader; + char *compressed_data = NULL; + int compressing = 0; + size_t buffer_len; + uLongf destLen; +#endif + memset(&dkey, 0, sizeof(DBT)); memset(&ddata, 0, sizeof(DBT)); dkey.size = ckeylen; @@ -181,21 +561,79 @@ int cdb_store(int cdb, ddata.size = cdatalen; ddata.data = cdata; - begin_critical_section(S_DATABASE); - lprintf(9, "cdb_store(%d) ...\n", cdb); - ret = dbp[cdb]->put(dbp[cdb], /* db */ - NULL, /* transaction ID (hmm...) */ - &dkey, /* key */ - &ddata, /* data */ - 0); /* flags */ - end_critical_section(S_DATABASE); - lprintf(9, "...put ( to file %d) returned %3d (%d bytes)\n", - cdb, ret, ddata.size); - if (ret) { - lprintf(1, "cdb_store: %s\n", db_strerror(ret)); - return (-1); +#ifdef HAVE_ZLIB + /* Only compress Visit records. Everything else is uncompressed. */ + if (cdb == CDB_VISIT) { + compressing = 1; + zheader.magic = COMPRESS_MAGIC; + zheader.uncompressed_len = cdatalen; + buffer_len = ((cdatalen * 101) / 100) + 100 + + sizeof(struct CtdlCompressHeader); + destLen = (uLongf) buffer_len; + compressed_data = malloc(buffer_len); + if (compress2((Bytef *) (compressed_data + + sizeof(struct + CtdlCompressHeader)), + &destLen, (Bytef *) cdata, (uLongf) cdatalen, + 1) != Z_OK) { + lprintf(CTDL_EMERG, "compress2() error\n"); + abort(); + } + zheader.compressed_len = (size_t) destLen; + memcpy(compressed_data, &zheader, + sizeof(struct CtdlCompressHeader)); + ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + + zheader.compressed_len); + ddata.data = compressed_data; + } +#endif + + if (MYTID != NULL) { + ret = dbp[cdb]->put(dbp[cdb], /* db */ + MYTID, /* transaction ID */ + &dkey, /* key */ + &ddata, /* data */ + 0); /* flags */ + if (ret) { + lprintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, + db_strerror(ret)); + abort(); + } +#ifdef HAVE_ZLIB + if (compressing) + free(compressed_data); +#endif + return ret; + + } else { + bailIfCursor(MYCURSORS, + "attempt to write during r/o cursor"); + + retry: + txbegin(&tid); + + if ((ret = dbp[cdb]->put(dbp[cdb], /* db */ + tid, /* transaction ID */ + &dkey, /* key */ + &ddata, /* data */ + 0))) { /* flags */ + if (ret == DB_LOCK_DEADLOCK) { + txabort(tid); + goto retry; + } else { + lprintf(CTDL_EMERG, "cdb_store(%d): %s\n", + cdb, db_strerror(ret)); + abort(); + } + } else { + txcommit(tid); +#ifdef HAVE_ZLIB + if (compressing) + free(compressed_data); +#endif + return ret; + } } - return (0); } @@ -206,21 +644,64 @@ int cdb_delete(int cdb, void *key, int keylen) { DBT dkey; + DB_TXN *tid; int ret; + memset(&dkey, 0, sizeof dkey); dkey.size = keylen; dkey.data = key; - begin_critical_section(S_DATABASE); - lprintf(9, "cdb_delete(%d) ...\n", cdb); - ret = dbp[cdb]->del(dbp[cdb], NULL, &dkey, 0); - lprintf(9, "cdb_delete returned %d\n", ret); - end_critical_section(S_DATABASE); - return (ret); - + if (MYTID != NULL) { + ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0); + if (ret) { + lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, + db_strerror(ret)); + if (ret != DB_NOTFOUND) + abort(); + } + } else { + bailIfCursor(MYCURSORS, + "attempt to delete during r/o cursor"); + + retry: + txbegin(&tid); + + if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0)) + && ret != DB_NOTFOUND) { + if (ret == DB_LOCK_DEADLOCK) { + txabort(tid); + goto retry; + } else { + lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n", + cdb, db_strerror(ret)); + abort(); + } + } else { + txcommit(tid); + } + } + return ret; } +static DBC *localcursor(int cdb) +{ + int ret; + DBC *curs; + + if (MYCURSORS[cdb] == NULL) + ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0); + else + ret = + MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, + DB_POSITION); + if (ret) { + lprintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret)); + abort(); + } + + return curs; +} /* @@ -236,57 +717,104 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen) int ret; memset(&dkey, 0, sizeof(DBT)); - memset(&dret, 0, sizeof(DBT)); dkey.size = keylen; dkey.data = key; - dret.flags = DB_DBT_MALLOC; - - begin_critical_section(S_DATABASE); - lprintf(9, "cdb_fetch(%d) ...\n", cdb); - ret = dbp[cdb]->get(dbp[cdb], NULL, &dkey, &dret, 0); - end_critical_section(S_DATABASE); - lprintf(9, "get (from file %d) returned %3d (%d bytes)\n", - cdb, ret, dret.size); + + if (MYTID != NULL) { + memset(&dret, 0, sizeof(DBT)); + dret.flags = DB_DBT_MALLOC; + ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0); + } else { + DBC *curs; + + do { + memset(&dret, 0, sizeof(DBT)); + dret.flags = DB_DBT_MALLOC; + + curs = localcursor(cdb); + + ret = curs->c_get(curs, &dkey, &dret, DB_SET); + cclose(curs); + } + while (ret == DB_LOCK_DEADLOCK); + + } + if ((ret != 0) && (ret != DB_NOTFOUND)) { - lprintf(1, "cdb_fetch: %s\n", db_strerror(ret)); + lprintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, + db_strerror(ret)); + abort(); } - if (ret != 0) return NULL; - tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata)); + + if (ret != 0) + return NULL; + tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata)); + if (tempcdb == NULL) { - lprintf(2, "Cannot allocate memory!\n"); + lprintf(CTDL_EMERG, + "cdb_fetch: Cannot allocate memory for tempcdb\n"); + abort(); } + tempcdb->len = dret.size; tempcdb->ptr = dret.data; +#ifdef HAVE_ZLIB + cdb_decompress_if_necessary(tempcdb); +#endif return (tempcdb); } /* - * Free a cdbdata item (ok, this is really no big deal, but we might need to do - * more complex stuff with other database managers in the future). + * Free a cdbdata item. + * + * Note that we only free the 'ptr' portion if it is not NULL. This allows + * other code to assume ownership of that memory simply by storing the + * pointer elsewhere and then setting 'ptr' to NULL. cdb_free() will then + * avoid freeing it. */ void cdb_free(struct cdbdata *cdb) { - phree(cdb->ptr); - phree(cdb); + if (cdb->ptr) { + free(cdb->ptr); + } + free(cdb); } +void cdb_close_cursor(int cdb) +{ + if (MYCURSORS[cdb] != NULL) + cclose(MYCURSORS[cdb]); + + MYCURSORS[cdb] = NULL; +} /* - * Prepare for a sequential search of an entire database. (In the DB model, - * use per-session key. There is guaranteed to be no more than one traversal in - * progress per session at any given time.) + * Prepare for a sequential search of an entire database. + * (There is guaranteed to be no more than one traversal in + * progress per thread at any given time.) */ void cdb_rewind(int cdb) { int ret = 0; - begin_critical_section(S_DATABASE); - ret = dbp[cdb]->cursor(dbp[cdb], NULL, &MYCURSOR, 0); + if (MYCURSORS[cdb] != NULL) { + lprintf(CTDL_EMERG, + "cdb_rewind: must close cursor on database %d before reopening.\n", + cdb); + abort(); + /* cclose(MYCURSORS[cdb]); */ + } + + /* + * Now initialize the cursor + */ + ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0); if (ret) { - lprintf(1, "db_cursor: %s\n", db_strerror(ret)); + lprintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", + db_strerror(ret)); + abort(); } - end_critical_section(S_DATABASE); } @@ -300,23 +828,113 @@ struct cdbdata *cdb_next_item(int cdb) struct cdbdata *cdbret; int ret = 0; - /* Initialize the key/data pair so the flags aren't set. */ - memset(&key, 0, sizeof(key)); - memset(&data, 0, sizeof(data)); + /* Initialize the key/data pair so the flags aren't set. */ + memset(&key, 0, sizeof(key)); + memset(&data, 0, sizeof(data)); data.flags = DB_DBT_MALLOC; - begin_critical_section(S_DATABASE); - lprintf(9, "cdb_next_item(%d)...\n", cdb); - ret = MYCURSOR->c_get(MYCURSOR, - &key, &data, DB_NEXT); - lprintf(9, "...returned %d\n", ret); - end_critical_section(S_DATABASE); - - if (ret) return NULL; /* presumably, end of file */ + ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT); - cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata)); + if (ret) { + if (ret != DB_NOTFOUND) { + lprintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", + cdb, db_strerror(ret)); + abort(); + } + cclose(MYCURSORS[cdb]); + MYCURSORS[cdb] = NULL; + return NULL; /* presumably, end of file */ + } + + cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata)); cdbret->len = data.size; cdbret->ptr = data.data; +#ifdef HAVE_ZLIB + cdb_decompress_if_necessary(cdbret); +#endif return (cdbret); } + + + +/* + * Transaction-based stuff. I'm writing this as I bake cookies... + */ + +void cdb_begin_transaction(void) +{ + + bailIfCursor(MYCURSORS, + "can't begin transaction during r/o cursor"); + + if (MYTID != NULL) { + lprintf(CTDL_EMERG, + "cdb_begin_transaction: ERROR: nested transaction\n"); + abort(); + } + + txbegin(&MYTID); +} + +void cdb_end_transaction(void) +{ + int i; + + for (i = 0; i < MAXCDB; i++) + if (MYCURSORS[i] != NULL) { + lprintf(CTDL_WARNING, + "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", + i); + cclose(MYCURSORS[i]); + MYCURSORS[i] = NULL; + } + + if (MYTID == NULL) { + lprintf(CTDL_EMERG, + "cdb_end_transaction: ERROR: txcommit(NULL) !!\n"); + abort(); + } else + txcommit(MYTID); + + MYTID = NULL; +} + +/* + * Truncate (delete every record) + */ +void cdb_trunc(int cdb) +{ + /* DB_TXN *tid; */ + int ret; + u_int32_t count; + + if (MYTID != NULL) { + lprintf(CTDL_EMERG, + "cdb_trunc must not be called in a transaction.\n"); + abort(); + } else { + bailIfCursor(MYCURSORS, + "attempt to write during r/o cursor"); + + retry: + /* txbegin(&tid); */ + + if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */ + NULL, /* transaction ID */ + &count, /* #rows deleted */ + 0))) { /* flags */ + if (ret == DB_LOCK_DEADLOCK) { + /* txabort(tid); */ + goto retry; + } else { + lprintf(CTDL_EMERG, + "cdb_truncate(%d): %s\n", cdb, + db_strerror(ret)); + abort(); + } + } else { + /* txcommit(tid); */ + } + } +}