]> code.citadel.org Git - citadel.git/blobdiff - citadel/database_sleepycat.c
* more merges
[citadel.git] / citadel / database_sleepycat.c
index 5b790c7462aa022bee3d2a88a2a080ccf46a5254..82d886e8989a4fd994a0641585e1b7d333c3751b 100644 (file)
  *
  */
 
+/*****************************************************************************
+       Tunable configuration parameters for the Sleepycat DB back end
+ *****************************************************************************/
+
+/* Citadel will checkpoint the db at the end of every session, but only if
+ * the specified number of kilobytes has been written, or if the specified
+ * number of minutes has passed, since the last checkpoint.
+ */
+#define MAX_CHECKPOINT_KBYTES  0
+#define MAX_CHECKPOINT_MINUTES 15
+
+/*****************************************************************************/
+
 #include "sysdep.h"
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
-#include <time.h>
 #include <ctype.h>
 #include <string.h>
 #include <errno.h>
+#include <sys/types.h>
 #include <db.h>
+#include <pthread.h>
 #include "citadel.h"
 #include "server.h"
 #include "citserver.h"
 #include "database.h"
 #include "sysdep_decls.h"
+#include "dynloader.h"
+
+DB *dbp[MAXCDB];               /* One DB handle for each Citadel database */
+DB_ENV *dbenv;                 /* The DB environment (global) */
+
+struct cdbtsd {                        /* Thread-specific DB stuff */
+       DB_TXN *tid;            /* Transaction handle */
+       DBC *cursor;            /* Cursor, for traversals... */
+};
+
+int num_ssd = 0;
+
+static pthread_key_t tsdkey;
+
+#define MYCURSOR       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
+#define MYTID          (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
+
+/* just a little helper function */
+static int txabort(DB_TXN *tid) {
+        int ret = txn_abort(tid);
+
+        if (ret)
+                lprintf(1, "txn_abort: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+/* this one is even more helpful than the last. */
+static int txcommit(DB_TXN *tid) {
+        int ret = txn_commit(tid, 0);
+
+        if (ret)
+                lprintf(1, "txn_commit: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+/* are you sensing a pattern yet? */
+static int txbegin(DB_TXN **tid) {
+        int ret = txn_begin(dbenv, NULL, tid, 0);
+
+        if (ret)
+                lprintf(1, "txn_begin: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+static void release_handles(void *arg) {
+       if (arg != NULL) {
+               struct cdbtsd *tsd = (struct cdbtsd *)arg;
+
+               if (tsd->cursor != NULL) {
+                       lprintf(1, "WARNING: cursor still in progress; "
+                               "closing!\n");
+                       tsd->cursor->c_close(tsd->cursor);
+               }
+
+               if (tsd->tid != NULL) {
+                       lprintf(1, "ERROR: transaction still in progress; "
+                               "aborting!\n");
+                       txabort(tsd->tid);
+               }
+       }
+}
 
+static void dest_tsd(void *arg) {
+       if (arg != NULL) {
+               release_handles(arg);
+               phree(arg);
+       }
+}
 
 /*
- * This array holds one DB handle for each Citadel database.
+ * Ensure that we have a key for thread-specific data.  We don't
+ * put anything in here that Citadel cares about; this is just database
+ * related stuff like cursors and transactions.
+ *
+ * This should be called immediately after startup by any thread which wants
+ * to use database calls, except for whatever thread calls open_databases.
  */
-DB *dbp[MAXCDB];
+void cdb_allocate_tsd(void) {
+       struct cdbtsd *tsd = mallok(sizeof *tsd);
+
+       tsd->tid = NULL;
+       tsd->cursor = NULL;
+       pthread_setspecific(tsdkey, tsd);
+}
 
-DB_ENV *dbenv;
+void cdb_free_tsd(void) {
+       dest_tsd(pthread_getspecific(tsdkey));
+       pthread_setspecific(tsdkey, NULL);
+}
+
+void cdb_release_handles(void) {
+       release_handles(pthread_getspecific(tsdkey));
+}
 
-DBC **cursorz = NULL;
-int num_cursorz = 0;
-#define MYCURSOR cursorz[CC->cs_pid]
 
 /*
  * Reclaim unused space in the databases.  We need to do each one of
@@ -45,6 +144,22 @@ void defrag_databases(void)
 }
 
 
+
+/*
+ * Request a checkpoint of the database.
+ */
+static void cdb_checkpoint(void) {
+       int ret;
+
+       ret = txn_checkpoint(dbenv,
+                               MAX_CHECKPOINT_KBYTES,
+                               MAX_CHECKPOINT_MINUTES,
+                               0);
+       if (ret) {
+               lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+       }
+}
+
 /*
  * Open the various databases we'll be using.  Any database which
  * does not exist should be created.  Note that we don't need an S_DATABASE
@@ -55,8 +170,10 @@ void open_databases(void)
 {
        int ret;
        int i;
-       char dbfilename[256];
+       char dbfilename[SIZ];
+       u_int32_t flags = 0;
 
+       lprintf(9, "open_databases() starting\n");
         /*
          * Silently try to create the database subdirectory.  If it's
          * already there, no problem.
@@ -64,6 +181,7 @@ void open_databases(void)
         system("exec mkdir data 2>/dev/null");
 
        lprintf(9, "Setting up DB environment\n");
+       db_env_set_func_yield(sched_yield);
        ret = db_env_create(&dbenv, 0);
        if (ret) {
                lprintf(1, "db_env_create: %s\n", db_strerror(ret));
@@ -82,15 +200,15 @@ void open_databases(void)
                 exit(ret);
         }
 
-        /*
-         * We have multiple processes reading/writing these files, so
-         * we need concurrency control and a shared buffer pool, but
-         * not logging or transactions.
-         */
-        /* (void)dbenv->set_data_dir(dbenv, "/database/files"); */
-        ret = dbenv->open(dbenv, "./data",
-               ( DB_CREATE | DB_INIT_MPOOL ),
-               0);
+       if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
+               lprintf(1, "set_lk_detect: %s\n", db_strerror(ret));
+               dbenv->close(dbenv, 0);
+               exit(ret);
+       }
+
+        flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
+               DB_INIT_LOCK|DB_THREAD;
+        ret = dbenv->open(dbenv, "./data", flags, 0);
        if (ret) {
                lprintf(1, "dbenv->open: %s\n", db_strerror(ret));
                 dbenv->close(dbenv, 0);
@@ -124,9 +242,16 @@ void open_databases(void)
                        lprintf(1, "db_open[%d]: %s\n", i, db_strerror(ret));
                        exit(ret);
                }
+       }
 
+       if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
+               lprintf(1, "pthread_key_create: %s\n", strerror(ret));
+               exit(1);
        }
 
+       cdb_allocate_tsd();
+       CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
+       lprintf(9, "open_databases() finished\n");
 }
 
 
@@ -139,7 +264,12 @@ void close_databases(void)
        int a;
        int ret;
 
-       begin_critical_section(S_DATABASE);
+       cdb_free_tsd();
+
+       if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
+               lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+       }
+
        for (a = 0; a < MAXCDB; ++a) {
                lprintf(7, "Closing database %d\n", a);
                ret = dbp[a]->close(dbp[a], 0);
@@ -149,20 +279,13 @@ void close_databases(void)
                
        }
 
-
-
         /* Close the handle. */
         ret = dbenv->close(dbenv, 0);
        if (ret) {
                 lprintf(1, "DBENV->close: %s\n", db_strerror(ret));
         }
-
-
-       end_critical_section(S_DATABASE);
-
 }
 
-
 /*
  * Store a piece of data.  Returns 0 if the operation was successful.  If a
  * key already exists it should be overwritten.
@@ -173,6 +296,7 @@ int cdb_store(int cdb,
 {
 
        DBT dkey, ddata;
+       DB_TXN *tid;
        int ret;
 
        memset(&dkey, 0, sizeof(DBT));
@@ -182,21 +306,42 @@ int cdb_store(int cdb,
        ddata.size = cdatalen;
        ddata.data = cdata;
 
-       begin_critical_section(S_DATABASE);
-       lprintf(9, "cdb_store(%d) ...\n", cdb);
-       ret = dbp[cdb]->put(dbp[cdb],           /* db */
-                               NULL,           /* transaction ID (hmm...) */
-                               &dkey,          /* key */
-                               &ddata,         /* data */
-                               0);             /* flags */
-       end_critical_section(S_DATABASE);
-       lprintf(9, "...put (  to file %d) returned %3d (%d bytes)\n",
-               cdb, ret, ddata.size);
-       if (ret) {
-               lprintf(1, "cdb_store: %s\n", db_strerror(ret));
-               return (-1);
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->put(dbp[cdb],           /* db */
+                                       MYTID,          /* transaction ID */
+                                       &dkey,          /* key */
+                                       &ddata,         /* data */
+                                       0);             /* flags */
+               if (ret) {
+                       lprintf(1, "cdb_store(%d): %s\n", cdb,
+                               db_strerror(ret));
+               }
+               return ret;
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return -1;
+
+               if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
+                                        tid,           /* transaction ID */
+                                        &dkey,         /* key */
+                                        &ddata,        /* data */
+                                        0))) {         /* flags */
+                       if (ret == DB_LOCK_DEADLOCK) {
+                               if (txabort(tid))
+                                       return ret;
+                               else
+                                       goto retry;
+                       } else {
+                               lprintf(1, "cdb_store(%d): %s\n", cdb,
+                                       db_strerror(ret));
+                               txabort(tid);
+                               return ret;
+                       }
+               } else {
+                       return txcommit(tid);
+               }
        }
-       return (0);
 }
 
 
@@ -207,18 +352,36 @@ int cdb_delete(int cdb, void *key, int keylen)
 {
 
        DBT dkey;
+       DB_TXN *tid;
        int ret;
 
        dkey.size = keylen;
        dkey.data = key;
 
-       begin_critical_section(S_DATABASE);
-       lprintf(9, "cdb_delete(%d) ...\n", cdb);
-       ret = dbp[cdb]->del(dbp[cdb], NULL, &dkey, 0);
-       lprintf(9, "cdb_delete returned %d\n", ret);
-       end_critical_section(S_DATABASE);
-       return (ret);
-
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
+               return (ret);
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return -1;
+
+               if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) {
+                       if (ret == DB_LOCK_DEADLOCK) {
+                                       if (txabort(tid))
+                                               return ret;
+                                       else
+                                               goto retry;
+                       } else {
+                               lprintf(1, "cdb_store(%d): %s\n", cdb,
+                                       db_strerror(ret));
+                               txabort(tid);
+                               return ret;
+                       }
+               } else {
+                       return txcommit(tid);
+               }
+       }
 }
 
 
@@ -234,6 +397,7 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
 
        struct cdbdata *tempcdb;
        DBT dkey, dret;
+       DB_TXN *tid;
        int ret;
 
        memset(&dkey, 0, sizeof(DBT));
@@ -242,12 +406,24 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
        dkey.data = key;
        dret.flags = DB_DBT_MALLOC;
 
-       begin_critical_section(S_DATABASE);
-       lprintf(9, "cdb_fetch(%d) ...\n", cdb);
-       ret = dbp[cdb]->get(dbp[cdb], NULL, &dkey, &dret, 0);
-       end_critical_section(S_DATABASE);
-       lprintf(9, "get (from file %d) returned %3d (%d bytes)\n",
-               cdb, ret, dret.size);
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return NULL;
+
+               ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
+
+               if (ret == DB_LOCK_DEADLOCK) {
+                       if (txabort(tid))
+                               return NULL;
+                       else
+                               goto retry;
+               } else if (txcommit(tid))
+                       return NULL;
+       }
+
        if ((ret != 0) && (ret != DB_NOTFOUND)) {
                lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
        }
@@ -276,37 +452,27 @@ void cdb_free(struct cdbdata *cdb)
 /* 
  * Prepare for a sequential search of an entire database.
  * (There is guaranteed to be no more than one traversal in
- * progress per session at any given time.)
+ * progress per thread at any given time.)
  */
 void cdb_rewind(int cdb)
 {
        int ret = 0;
 
-       /*
-        * Make sure we have a cursor allocated for this session
-        */
+       if (MYCURSOR != NULL)
+               MYCURSOR->c_close(MYCURSOR);
 
-       if (num_cursorz <= CC->cs_pid) {
-               num_cursorz = CC->cs_pid + 1;
-               if (cursorz == NULL) {
-                       cursorz = (DBC **)
-                           mallok((sizeof(DBC *) * num_cursorz));
-               } else {
-                       cursorz = (DBC **)
-                           reallok(cursorz, (sizeof(DBC *) * num_cursorz));
-               }
+       if (MYTID == NULL) {
+               lprintf(1, "ERROR: cursor use outside transaction\n");
+               abort();
        }
 
-
        /*
         * Now initialize the cursor
         */
-       begin_critical_section(S_DATABASE);
-       ret = dbp[cdb]->cursor(dbp[cdb], NULL, &MYCURSOR, 0);
+       ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
        if (ret) {
                lprintf(1, "db_cursor: %s\n", db_strerror(ret));
        }
-       end_critical_section(S_DATABASE);
 }
 
 
@@ -325,14 +491,14 @@ struct cdbdata *cdb_next_item(int cdb)
         memset(&data, 0, sizeof(data));
        data.flags = DB_DBT_MALLOC;
 
-       begin_critical_section(S_DATABASE);
-       lprintf(9, "cdb_next_item(%d)...\n", cdb);
        ret = MYCURSOR->c_get(MYCURSOR,
                &key, &data, DB_NEXT);
-       lprintf(9, "...returned %d\n", ret);
-       end_critical_section(S_DATABASE);
        
-       if (ret) return NULL;           /* presumably, end of file */
+       if (ret) {
+               MYCURSOR->c_close(MYCURSOR);
+               MYCURSOR = NULL;
+               return NULL;            /* presumably, end of file */
+       }
 
        cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
        cdbret->len = data.size;
@@ -340,3 +506,31 @@ struct cdbdata *cdb_next_item(int cdb)
 
        return (cdbret);
 }
+
+
+/*
+ * Transaction-based stuff.  I'm writing this as I bake cookies...
+ */
+
+void cdb_begin_transaction(void) {
+
+       if (MYTID != NULL) {    /* FIXME this slows it down, take it out */
+               lprintf(1, "ERROR: opening a new transaction with one already open!\n");
+               abort();
+       }
+       else {
+               txbegin(&MYTID);
+       }
+}
+
+void cdb_end_transaction(void) {
+       if (MYCURSOR != NULL) {
+               lprintf(1, "WARNING: cursor still open at transaction end\n");
+               MYCURSOR->c_close(MYCURSOR);
+               MYCURSOR = NULL;
+       }
+       if (MYTID == NULL) lprintf(1, "ERROR: txcommit(NULL) !!\n");
+       else txcommit(MYTID);
+       MYTID = NULL;
+}
+