*
*/
+/*****************************************************************************
+ Tunable configuration parameters for the Sleepycat DB back end
+ *****************************************************************************/
+
+/* Citadel will checkpoint the db at the end of every session, but only if
+ * the specified number of kilobytes has been written, or if the specified
+ * number of minutes has passed, since the last checkpoint.
+ */
+#define MAX_CHECKPOINT_KBYTES 0
+#define MAX_CHECKPOINT_MINUTES 15
+
+/*****************************************************************************/
+
#include "sysdep.h"
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
-#include <time.h>
#include <ctype.h>
#include <string.h>
#include <errno.h>
+#include <sys/types.h>
#include <db.h>
+#include <pthread.h>
#include "citadel.h"
#include "server.h"
#include "citserver.h"
#include "database.h"
#include "sysdep_decls.h"
+#include "dynloader.h"
+
+DB *dbp[MAXCDB]; /* One DB handle for each Citadel database */
+DB_ENV *dbenv; /* The DB environment (global) */
+
+struct cdbtsd { /* Thread-specific DB stuff */
+ DB_TXN *tid; /* Transaction handle */
+ DBC *cursor; /* Cursor, for traversals... */
+};
+int num_ssd = 0;
+
+static pthread_key_t tsdkey;
+
+#define MYCURSOR (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
+#define MYTID (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
+
+/* just a little helper function */
+static int txabort(DB_TXN *tid) {
+ int ret = txn_abort(tid);
+
+ if (ret)
+ lprintf(1, "txn_abort: %s\n", db_strerror(ret));
+
+ return ret;
+}
+
+/* this one is even more helpful than the last. */
+static int txcommit(DB_TXN *tid) {
+ int ret = txn_commit(tid, 0);
+
+ if (ret)
+ lprintf(1, "txn_commit: %s\n", db_strerror(ret));
+
+ return ret;
+}
+
+/* are you sensing a pattern yet? */
+static int txbegin(DB_TXN **tid) {
+ int ret = txn_begin(dbenv, NULL, tid, 0);
+
+ if (ret)
+ lprintf(1, "txn_begin: %s\n", db_strerror(ret));
+
+ return ret;
+}
+
+static void release_handles(void *arg) {
+ if (arg != NULL) {
+ struct cdbtsd *tsd = (struct cdbtsd *)arg;
+
+ if (tsd->cursor != NULL) {
+ lprintf(1, "WARNING: cursor still in progress; "
+ "closing!\n");
+ tsd->cursor->c_close(tsd->cursor);
+ }
+
+ if (tsd->tid != NULL) {
+ lprintf(1, "ERROR: transaction still in progress; "
+ "aborting!\n");
+ txabort(tsd->tid);
+ }
+ }
+}
+
+static void dest_tsd(void *arg) {
+ if (arg != NULL) {
+ release_handles(arg);
+ phree(arg);
+ }
+}
/*
- * This array holds one DB handle for each Citadel database.
+ * Ensure that we have a key for thread-specific data. We don't
+ * put anything in here that Citadel cares about; this is just database
+ * related stuff like cursors and transactions.
+ *
+ * This should be called immediately after startup by any thread which wants
+ * to use database calls, except for whatever thread calls open_databases.
*/
-DB *dbp[MAXCDB];
+void cdb_allocate_tsd(void) {
+ struct cdbtsd *tsd = mallok(sizeof *tsd);
+
+ tsd->tid = NULL;
+ tsd->cursor = NULL;
+ pthread_setspecific(tsdkey, tsd);
+}
+
+void cdb_free_tsd(void) {
+ dest_tsd(pthread_getspecific(tsdkey));
+ pthread_setspecific(tsdkey, NULL);
+}
-DB_ENV *dbenv;
+void cdb_release_handles(void) {
+ release_handles(pthread_getspecific(tsdkey));
+}
-DBC *MYCURSOR; /* FIXME !! */
/*
* Reclaim unused space in the databases. We need to do each one of
* these discretely, rather than in a loop.
+ *
+ * This is a stub function in the Sleepycat DB backend, because there is no
+ * such API call available.
*/
void defrag_databases(void)
{
- /* FIXME ... do we even need this? If not, we'll just keep it as
- * a stub function to keep the API consistent.
- */
+ /* do nothing */
}
+
+/*
+ * Request a checkpoint of the database.
+ */
+static void cdb_checkpoint(void) {
+ int ret;
+
+ ret = txn_checkpoint(dbenv,
+ MAX_CHECKPOINT_KBYTES,
+ MAX_CHECKPOINT_MINUTES,
+ 0);
+ if (ret) {
+ lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+ }
+}
+
/*
* Open the various databases we'll be using. Any database which
* does not exist should be created. Note that we don't need an S_DATABASE
{
int ret;
int i;
- char dbfilename[256];
+ char dbfilename[SIZ];
+ u_int32_t flags = 0;
+ lprintf(9, "open_databases() starting\n");
/*
* Silently try to create the database subdirectory. If it's
* already there, no problem.
system("exec mkdir data 2>/dev/null");
lprintf(9, "Setting up DB environment\n");
+ db_env_set_func_yield(sched_yield);
ret = db_env_create(&dbenv, 0);
if (ret) {
lprintf(1, "db_env_create: %s\n", db_strerror(ret));
exit(ret);
}
- dbenv->set_errfile(dbenv, stderr); /* FIXME */
dbenv->set_errpfx(dbenv, "citserver");
/*
exit(ret);
}
- /*
- * We have multiple processes reading/writing these files, so
- * we need concurrency control and a shared buffer pool, but
- * not logging or transactions.
- */
- /* (void)dbenv->set_data_dir(dbenv, "/database/files"); */
- ret = dbenv->open(dbenv, "./data",
- DB_CREATE | DB_INIT_LOCK | DB_INIT_MPOOL | DB_THREAD, 0);
+ if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
+ lprintf(1, "set_lk_detect: %s\n", db_strerror(ret));
+ dbenv->close(dbenv, 0);
+ exit(ret);
+ }
+
+ flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
+ DB_INIT_LOCK|DB_THREAD;
+ ret = dbenv->open(dbenv, "./data", flags, 0);
if (ret) {
lprintf(1, "dbenv->open: %s\n", db_strerror(ret));
dbenv->close(dbenv, 0);
lprintf(1, "db_open[%d]: %s\n", i, db_strerror(ret));
exit(ret);
}
+ }
+ if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
+ lprintf(1, "pthread_key_create: %s\n", strerror(ret));
+ exit(1);
}
+ cdb_allocate_tsd();
+ CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
+ lprintf(9, "open_databases() finished\n");
}
int a;
int ret;
- begin_critical_section(S_DATABASE);
+ cdb_free_tsd();
+
+ if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
+ lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+ }
+
for (a = 0; a < MAXCDB; ++a) {
lprintf(7, "Closing database %d\n", a);
ret = dbp[a]->close(dbp[a], 0);
}
-
-
/* Close the handle. */
ret = dbenv->close(dbenv, 0);
if (ret) {
lprintf(1, "DBENV->close: %s\n", db_strerror(ret));
}
-
-
- end_critical_section(S_DATABASE);
-
}
-
/*
* Store a piece of data. Returns 0 if the operation was successful. If a
* key already exists it should be overwritten.
{
DBT dkey, ddata;
+ DB_TXN *tid;
int ret;
memset(&dkey, 0, sizeof(DBT));
ddata.size = cdatalen;
ddata.data = cdata;
- begin_critical_section(S_DATABASE);
- ret = dbp[cdb]->put(dbp[cdb], /* db */
- NULL, /* transaction ID (hmm...) */
- &dkey, /* key */
- &ddata, /* data */
- 0); /* flags */
- end_critical_section(S_DATABASE);
- lprintf(9, "put ( to file %d) returned %3d (%d bytes)\n",
- cdb, ret, ddata.size);
- if (ret) {
- lprintf(1, "cdb_store: %s\n", db_strerror(ret));
- return (-1);
+ if (MYTID != NULL) {
+ ret = dbp[cdb]->put(dbp[cdb], /* db */
+ MYTID, /* transaction ID */
+ &dkey, /* key */
+ &ddata, /* data */
+ 0); /* flags */
+ if (ret) {
+ lprintf(1, "cdb_store(%d): %s\n", cdb,
+ db_strerror(ret));
+ }
+ return ret;
+ } else {
+ retry:
+ if (txbegin(&tid))
+ return -1;
+
+ if ((ret = dbp[cdb]->put(dbp[cdb], /* db */
+ tid, /* transaction ID */
+ &dkey, /* key */
+ &ddata, /* data */
+ 0))) { /* flags */
+ if (ret == DB_LOCK_DEADLOCK) {
+ if (txabort(tid))
+ return ret;
+ else
+ goto retry;
+ } else {
+ lprintf(1, "cdb_store(%d): %s\n", cdb,
+ db_strerror(ret));
+ txabort(tid);
+ return ret;
+ }
+ } else {
+ return txcommit(tid);
+ }
}
- return (0);
}
{
DBT dkey;
+ DB_TXN *tid;
int ret;
dkey.size = keylen;
dkey.data = key;
- begin_critical_section(S_DATABASE);
- ret = dbp[cdb]->del(dbp[cdb], NULL, &dkey, 0);
- end_critical_section(S_DATABASE);
- return (ret);
-
+ if (MYTID != NULL) {
+ ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
+ return (ret);
+ } else {
+ retry:
+ if (txbegin(&tid))
+ return -1;
+
+ if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) {
+ if (ret == DB_LOCK_DEADLOCK) {
+ if (txabort(tid))
+ return ret;
+ else
+ goto retry;
+ } else {
+ lprintf(1, "cdb_store(%d): %s\n", cdb,
+ db_strerror(ret));
+ txabort(tid);
+ return ret;
+ }
+ } else {
+ return txcommit(tid);
+ }
+ }
}
struct cdbdata *tempcdb;
DBT dkey, dret;
+ DB_TXN *tid;
int ret;
memset(&dkey, 0, sizeof(DBT));
dkey.data = key;
dret.flags = DB_DBT_MALLOC;
- begin_critical_section(S_DATABASE);
- ret = dbp[cdb]->get(dbp[cdb], NULL, &dkey, &dret, 0);
- end_critical_section(S_DATABASE);
- lprintf(9, "get (from file %d) returned %3d (%d bytes)\n",
- cdb, ret, dret.size);
+ if (MYTID != NULL) {
+ ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
+ } else {
+ retry:
+ if (txbegin(&tid))
+ return NULL;
+
+ ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
+
+ if (ret == DB_LOCK_DEADLOCK) {
+ if (txabort(tid))
+ return NULL;
+ else
+ goto retry;
+ } else if (txcommit(tid))
+ return NULL;
+ }
+
if ((ret != 0) && (ret != DB_NOTFOUND)) {
lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
- return NULL;
}
+ if (ret != 0) return NULL;
tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
if (tempcdb == NULL) {
lprintf(2, "Cannot allocate memory!\n");
/*
- * Prepare for a sequential search of an entire database. (In the DB model,
- * use per-session key. There is guaranteed to be no more than one traversal in
- * progress per session at any given time.)
+ * Prepare for a sequential search of an entire database.
+ * (There is guaranteed to be no more than one traversal in
+ * progress per thread at any given time.)
*/
void cdb_rewind(int cdb)
{
int ret = 0;
- begin_critical_section(S_DATABASE);
- ret = dbp[cdb]->cursor(dbp[cdb], NULL, &MYCURSOR, 0);
+ if (MYCURSOR != NULL)
+ MYCURSOR->c_close(MYCURSOR);
+
+ if (MYTID == NULL) {
+ lprintf(1, "ERROR: cursor use outside transaction\n");
+ abort();
+ }
+
+ /*
+ * Now initialize the cursor
+ */
+ ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
if (ret) {
lprintf(1, "db_cursor: %s\n", db_strerror(ret));
}
- end_critical_section(S_DATABASE);
}
memset(&data, 0, sizeof(data));
data.flags = DB_DBT_MALLOC;
- begin_critical_section(S_DATABASE);
ret = MYCURSOR->c_get(MYCURSOR,
&key, &data, DB_NEXT);
- end_critical_section(S_DATABASE);
- if (ret) return NULL; /* presumably, end of file */
+ if (ret) {
+ MYCURSOR->c_close(MYCURSOR);
+ MYCURSOR = NULL;
+ return NULL; /* presumably, end of file */
+ }
cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
cdbret->len = data.size;
return (cdbret);
}
+
+
+/*
+ * Transaction-based stuff. I'm writing this as I bake cookies...
+ */
+
+void cdb_begin_transaction(void) {
+
+ if (MYTID != NULL) { /* FIXME this slows it down, take it out */
+ lprintf(1, "ERROR: opening a new transaction with one already open!\n");
+ abort();
+ }
+ else {
+ txbegin(&MYTID);
+ }
+}
+
+void cdb_end_transaction(void) {
+ if (MYCURSOR != NULL) {
+ lprintf(1, "WARNING: cursor still open at transaction end\n");
+ MYCURSOR->c_close(MYCURSOR);
+ MYCURSOR = NULL;
+ }
+ if (MYTID == NULL) lprintf(1, "ERROR: txcommit(NULL) !!\n");
+ else txcommit(MYTID);
+ MYTID = NULL;
+}
+