]> code.citadel.org Git - citadel.git/blobdiff - citadel/database_sleepycat.c
* Finished (mostly) the Sleepycat DB backend ... added transaction logging
[citadel.git] / citadel / database_sleepycat.c
index 4861f85e7157757e1366060799da6b6e89b90094..9731c9a7aba035c378913d6f5ab3694c639b569b 100644 (file)
 #include "citserver.h"
 #include "database.h"
 #include "sysdep_decls.h"
+#include "dynloader.h"
 
 
+/* 
+ * FIXME this should be defined somewhere else.
+ */
+int transaction_based = 1;
+
+DB *dbp[MAXCDB];               /* One DB handle for each Citadel database */
+DB_ENV *dbenv;                 /* The DB environment (global) */
+
+struct cdbssd {                        /* Session-specific DB stuff */
+       DBC *cursor;            /* Cursor, for traversals... */
+       DB_TXN *tid;            /* Transaction ID */
+};
+
+struct cdbssd *ssd_arr = NULL;
+int num_ssd = 0;
+#define MYCURSOR       ssd_arr[CC->cs_pid].cursor
+#define MYTID          ssd_arr[CC->cs_pid].tid
+
 /*
- * This array holds one DB handle for each Citadel database.
+ * Ensure that we have enough space for session-specific data.  We don't
+ * put anything in here that Citadel cares about; this is just database
+ * related stuff like cursors and transactions.
  */
-DB *dbp[MAXCDB];
+void cdb_allocate_ssd(void) {
+       /*
+        * Make sure we have a cursor allocated for this session
+        */
 
-DB_ENV *dbenv;
+       lprintf(9, "num_ssd before realloc = %d\n", num_ssd);
+       if (num_ssd <= CC->cs_pid) {
+               num_ssd = CC->cs_pid + 1;
+               if (ssd_arr == NULL) {
+                       ssd_arr = (struct cdbssd *)
+                           mallok((sizeof(struct cdbssd) * num_ssd));
+               } else {
+                       ssd_arr = (struct cdbssd *)
+                           reallok(ssd_arr, (sizeof(struct cdbssd) * num_ssd));
+               }
+       }
+       lprintf(9, "num_ssd  after realloc = %d\n", num_ssd);
+}
 
-DBC *cursorz[999];     /* FIXME !! */
-#define MYCURSOR cursorz[CC->cs_pid]
 
 /*
  * Reclaim unused space in the databases.  We need to do each one of
  * these discretely, rather than in a loop.
+ *
+ * This is a stub function in the Sleepycat DB backend, because there is no
+ * such API call available.
  */
 void defrag_databases(void)
 {
-       /* FIXME ... do we even need this?  If not, we'll just keep it as
-        * a stub function to keep the API consistent.
-        */
+       /* do nothing */
 }
 
 
@@ -54,6 +89,7 @@ void open_databases(void)
        int ret;
        int i;
        char dbfilename[256];
+       u_int32_t flags = 0;
 
         /*
          * Silently try to create the database subdirectory.  If it's
@@ -67,7 +103,6 @@ void open_databases(void)
                lprintf(1, "db_env_create: %s\n", db_strerror(ret));
                exit(ret);
        }
-       dbenv->set_errfile(dbenv, stderr);  /* FIXME */
        dbenv->set_errpfx(dbenv, "citserver");
 
         /*
@@ -82,14 +117,15 @@ void open_databases(void)
         }
 
         /*
-         * We have multiple processes reading/writing these files, so
-         * we need concurrency control and a shared buffer pool, but
-         * not logging or transactions.
+        * We specify DB_PRIVATE but not DB_INIT_LOCK or DB_THREAD, even
+        * though this is a multithreaded application.  Since Citadel does all
+        * database access in S_DATABASE critical sections, access to the db
+        * is serialized already, so don't bother the database manager with
+        * it.  Besides, it locks up when we do it that way.
          */
-        /* (void)dbenv->set_data_dir(dbenv, "/database/files"); */
-        ret = dbenv->open(dbenv, "./data",
-               ( DB_CREATE | DB_INIT_LOCK | DB_INIT_MPOOL | DB_THREAD ),
-               0);
+        flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE;
+       if (transaction_based) flags = flags | DB_INIT_TXN;
+        ret = dbenv->open(dbenv, "./data", flags, 0);
        if (ret) {
                lprintf(1, "dbenv->open: %s\n", db_strerror(ret));
                 dbenv->close(dbenv, 0);
@@ -126,6 +162,10 @@ void open_databases(void)
 
        }
 
+       cdb_allocate_ssd();
+       CtdlRegisterSessionHook(cdb_allocate_ssd, EVT_START);
+
+
 }
 
 
@@ -138,7 +178,7 @@ void close_databases(void)
        int a;
        int ret;
 
-       /* begin_critical_section(S_DATABASE); */
+       begin_critical_section(S_DATABASE);
        for (a = 0; a < MAXCDB; ++a) {
                lprintf(7, "Closing database %d\n", a);
                ret = dbp[a]->close(dbp[a], 0);
@@ -157,7 +197,7 @@ void close_databases(void)
         }
 
 
-       /* end_critical_section(S_DATABASE); */
+       end_critical_section(S_DATABASE);
 
 }
 
@@ -181,16 +221,13 @@ int cdb_store(int cdb,
        ddata.size = cdatalen;
        ddata.data = cdata;
 
-       /* begin_critical_section(S_DATABASE); */
-       lprintf(9, "cdb_store(%d) ...\n", cdb);
+       begin_critical_section(S_DATABASE);
        ret = dbp[cdb]->put(dbp[cdb],           /* db */
-                               NULL,           /* transaction ID (hmm...) */
+                               MYTID,          /* transaction ID */
                                &dkey,          /* key */
                                &ddata,         /* data */
                                0);             /* flags */
-       /* end_critical_section(S_DATABASE); */
-       lprintf(9, "...put (  to file %d) returned %3d (%d bytes)\n",
-               cdb, ret, ddata.size);
+       end_critical_section(S_DATABASE);
        if (ret) {
                lprintf(1, "cdb_store: %s\n", db_strerror(ret));
                return (-1);
@@ -211,11 +248,9 @@ int cdb_delete(int cdb, void *key, int keylen)
        dkey.size = keylen;
        dkey.data = key;
 
-       /* begin_critical_section(S_DATABASE); */
-       lprintf(9, "cdb_delete(%d) ...\n", cdb);
-       ret = dbp[cdb]->del(dbp[cdb], NULL, &dkey, 0);
-       lprintf(9, "cdb_delete returned %d\n", ret);
-       /* end_critical_section(S_DATABASE); */
+       begin_critical_section(S_DATABASE);
+       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
+       end_critical_section(S_DATABASE);
        return (ret);
 
 }
@@ -241,12 +276,9 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
        dkey.data = key;
        dret.flags = DB_DBT_MALLOC;
 
-       /* begin_critical_section(S_DATABASE); */
-       lprintf(9, "cdb_fetch(%d) ...\n", cdb);
-       ret = dbp[cdb]->get(dbp[cdb], NULL, &dkey, &dret, 0);
-       /* end_critical_section(S_DATABASE); */
-       lprintf(9, "get (from file %d) returned %3d (%d bytes)\n",
-               cdb, ret, dret.size);
+       begin_critical_section(S_DATABASE);
+       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
+       end_critical_section(S_DATABASE);
        if ((ret != 0) && (ret != DB_NOTFOUND)) {
                lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
        }
@@ -273,20 +305,25 @@ void cdb_free(struct cdbdata *cdb)
 
 
 /* 
- * Prepare for a sequential search of an entire database.  (In the DB model,
- * use per-session key. There is guaranteed to be no more than one traversal in
+ * Prepare for a sequential search of an entire database.
+ * (There is guaranteed to be no more than one traversal in
  * progress per session at any given time.)
  */
 void cdb_rewind(int cdb)
 {
        int ret = 0;
 
-       /* begin_critical_section(S_DATABASE); */
-       ret = dbp[cdb]->cursor(dbp[cdb], NULL, &MYCURSOR, 0);
+       cdb_allocate_ssd();
+
+       /*
+        * Now initialize the cursor
+        */
+       begin_critical_section(S_DATABASE);
+       ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
        if (ret) {
                lprintf(1, "db_cursor: %s\n", db_strerror(ret));
        }
-       /* end_critical_section(S_DATABASE); */
+       end_critical_section(S_DATABASE);
 }
 
 
@@ -305,12 +342,10 @@ struct cdbdata *cdb_next_item(int cdb)
         memset(&data, 0, sizeof(data));
        data.flags = DB_DBT_MALLOC;
 
-       /* begin_critical_section(S_DATABASE); */
-       lprintf(9, "cdb_next_item(%d)...\n", cdb);
+       begin_critical_section(S_DATABASE);
        ret = MYCURSOR->c_get(MYCURSOR,
                &key, &data, DB_NEXT);
-       lprintf(9, "...returned %d\n", ret);
-       /* end_critical_section(S_DATABASE); */
+       end_critical_section(S_DATABASE);
        
        if (ret) return NULL;           /* presumably, end of file */
 
@@ -320,3 +355,21 @@ struct cdbdata *cdb_next_item(int cdb)
 
        return (cdbret);
 }
+
+
+/*
+ * Transaction-based stuff.  I'm writing this as I bake cookies...
+ */
+
+void cdb_begin_transaction(void) {
+       if (!transaction_based) {
+               MYTID = NULL;
+               return;
+       }
+
+       txn_begin(dbenv, NULL, &MYTID, 0);
+}
+
+void cdb_end_transaction(void) {
+       if (transaction_based) txn_commit(MYTID, 0);
+}