implemented read-only cursors. one of the advantages to these is that
authorNathan Bryant <loanshark@uncensored.citadel.org>
Sat, 28 Jul 2001 00:02:50 +0000 (00:02 +0000)
committerNathan Bryant <loanshark@uncensored.citadel.org>
Sat, 28 Jul 2001 00:02:50 +0000 (00:02 +0000)
transactions can be avoided; a cursor operation that occurs within a
transaction will often acquire a read lock on every single database page.  in
general, the Sleepycat documentation recommends avoiding transaction-protected
read-only operations where practical. read/modify/write operations can still
be transaction protected, of course.

to use a read-only cursor, call cdb_rewind without a previous call to
cdb_begin_transaction. the DB driver will notice this and prevent the current
thread from modifying data or starting a transaction until the cursor is
closed.

citadel/ChangeLog
citadel/database_sleepycat.c

index cb046aef011e3e95d6b593b5e50aee4e656b93d9..cbf754dbf225f862f225fa73e14623b39200d77e 100644 (file)
@@ -1,4 +1,17 @@
  $Log$
+ Revision 580.12  2001/07/28 00:02:50  nbryant
+ implemented read-only cursors. one of the advantages to these is that
+ transactions can be avoided; a cursor operation that occurs within a
+ transaction will often acquire a read lock on every single database page.  in
+ general, the Sleepycat documentation recommends avoiding transaction-protected
+ read-only operations where practical. read/modify/write operations can still
+ be transaction protected, of course.
+
+ to use a read-only cursor, call cdb_rewind without a previous call to
+ cdb_begin_transaction. the DB driver will notice this and prevent the current
+ thread from modifying data or starting a transaction until the cursor is
+ closed.
+
  Revision 580.11  2001/07/27 20:45:44  nbryant
  libtool has matured a lot since the last time i looked at it (years ago)
  so we now use it to handle the details of building shared libraries and
@@ -2606,4 +2619,3 @@ Sat Jul 11 00:20:48 EDT 1998 Nathan Bryant <bryant@cs.usm.maine.edu>
 
 Fri Jul 10 1998 Art Cancro <ajc@uncensored.citadel.org>
        * Initial CVS import 
-
index a07fefd058aba637f4a6234af8079d5490ab052a..3c52989ae995a06df8c54cd35cf064b6671d07e0 100644 (file)
 #include "sysdep_decls.h"
 #include "dynloader.h"
 
-DB *dbp[MAXCDB];               /* One DB handle for each Citadel database */
-DB_ENV *dbenv;                 /* The DB environment (global) */
+static DB *dbp[MAXCDB];                /* One DB handle for each Citadel database */
+static DB_ENV *dbenv;          /* The DB environment (global) */
 
 struct cdbtsd {                        /* Thread-specific DB stuff */
        DB_TXN *tid;            /* Transaction handle */
        DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
 };
 
-int num_ssd = 0;
-
 static pthread_key_t tsdkey;
 
 #define MYCURSORS      (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
@@ -91,17 +89,23 @@ static void cclose(DBC *cursor) {
        }
 }
 
+static void bailIfCursor(DBC **cursors, const char *msg)
+{
+  int i;
+
+  for (i = 0; i < MAXCDB; i++)
+    if (cursors[i] != NULL)
+      {
+       lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
+       abort();
+      }
+}
+
 static void check_handles(void *arg) {
        if (arg != NULL) {
                struct cdbtsd *tsd = (struct cdbtsd *)arg;
-               int i;
 
-               for (i = 0; i < MAXCDB; i++)
-                 if (tsd->cursors[i] != NULL)
-                   {
-                     lprintf(1, "cdb_*: cursor still in progress on cdb %d!", i);
-                     abort();
-                   }
+               bailIfCursor(tsd->cursors, "in check_handles");
 
                if (tsd->tid != NULL) {
                        lprintf(1, "cdb_*: transaction still in progress!");
@@ -165,7 +169,7 @@ void defrag_databases(void)
 /*
  * Cull the database logs
  */
-void cdb_cull_logs(void) {
+static void cdb_cull_logs(void) {
        DIR *dp;
        struct dirent *d;
        char filename[SIZ];
@@ -359,51 +363,64 @@ int cdb_store(int cdb,
              void *cdata, int cdatalen)
 {
 
-       DBT dkey, ddata;
-       DB_TXN *tid;
-       int ret;
-
-       memset(&dkey, 0, sizeof(DBT));
-       memset(&ddata, 0, sizeof(DBT));
-       dkey.size = ckeylen;
-       dkey.data = ckey;
-       ddata.size = cdatalen;
-       ddata.data = cdata;
-
-       if (MYTID != NULL) {
-               ret = dbp[cdb]->put(dbp[cdb],           /* db */
-                                       MYTID,          /* transaction ID */
-                                       &dkey,          /* key */
-                                       &ddata,         /* data */
-                                       0);             /* flags */
-               if (ret) {
-                       lprintf(1, "cdb_store(%d): %s\n", cdb,
-                               db_strerror(ret));
-                       abort();
-               }
-               return ret;
-       } else {
-           retry:
-               txbegin(&tid);
-
-               if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
-                                        tid,           /* transaction ID */
-                                        &dkey,         /* key */
-                                        &ddata,        /* data */
-                                        0))) {         /* flags */
-                       if (ret == DB_LOCK_DEADLOCK) {
-                               txabort(tid);
-                               goto retry;
-                       } else {
-                               lprintf(1, "cdb_store(%d): %s\n", cdb,
-                                       db_strerror(ret));
-                               abort();
-                       }
-               } else {
-                       txcommit(tid);
-                       return ret;
-               }
+  DBT dkey, ddata;
+  DB_TXN *tid;
+  int ret;
+  
+  memset(&dkey, 0, sizeof(DBT));
+  memset(&ddata, 0, sizeof(DBT));
+  dkey.size = ckeylen;
+  dkey.data = ckey;
+  ddata.size = cdatalen;
+  ddata.data = cdata;
+  
+  if (MYTID != NULL)
+    {
+      ret = dbp[cdb]->put(dbp[cdb],            /* db */
+                         MYTID,                /* transaction ID */
+                         &dkey,                /* key */
+                         &ddata,               /* data */
+                         0);           /* flags */
+      if (ret)
+       {
+         lprintf(1, "cdb_store(%d): %s\n", cdb,
+                 db_strerror(ret));
+         abort();
+       }
+      return ret;
+      
+    }
+  else
+    {
+      bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
+      
+    retry:
+      txbegin(&tid);
+      
+      if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
+                              tid,         /* transaction ID */
+                              &dkey,       /* key */
+                              &ddata,      /* data */
+                              0)))         /* flags */
+       {
+         if (ret == DB_LOCK_DEADLOCK)
+           {
+             txabort(tid);
+             goto retry;
+           }
+         else
+           {
+             lprintf(1, "cdb_store(%d): %s\n", cdb,
+                     db_strerror(ret));
+             abort();
+           }
+       }
+      else
+       {
+         txcommit(tid);
+         return ret;
        }
+    }
 }
 
 
@@ -413,44 +430,73 @@ int cdb_store(int cdb,
 int cdb_delete(int cdb, void *key, int keylen)
 {
 
-       DBT dkey;
-       DB_TXN *tid;
-       int ret;
+  DBT dkey;
+  DB_TXN *tid;
+  int ret;
 
-       memset(&dkey, 0, sizeof dkey);
-       dkey.size = keylen;
-       dkey.data = key;
+  memset(&dkey, 0, sizeof dkey);
+  dkey.size = keylen;
+  dkey.data = key;
 
-       if (MYTID != NULL) {
-               ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
-               if (ret) {
-                       lprintf(1, "cdb_delete(%d): %s\n", cdb,
-                               db_strerror(ret));
-                       if (ret != DB_NOTFOUND)
-                               abort();
-               }
-       } else {
-           retry:
-               txbegin(&tid);
-
-               if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
-                   && ret != DB_NOTFOUND) {
-                       if (ret == DB_LOCK_DEADLOCK) {
-                                       txabort(tid);
-                                       goto retry;
-                       } else {
-                               lprintf(1, "cdb_delete(%d): %s\n", cdb,
-                                       db_strerror(ret));
-                               abort();
-                       }
-               } else {
-                       txcommit(tid);
-               }
+  if (MYTID != NULL)
+    {
+      ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
+      if (ret)
+       {
+         lprintf(1, "cdb_delete(%d): %s\n", cdb,
+                 db_strerror(ret));
+         if (ret != DB_NOTFOUND)
+           abort();
        }
-       return ret;
+    }
+  else
+    {
+      bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
+    
+    retry:
+      txbegin(&tid);
+    
+      if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
+         && ret != DB_NOTFOUND)
+       {
+         if (ret == DB_LOCK_DEADLOCK)
+           {
+             txabort(tid);
+             goto retry;
+           }
+         else
+           {
+             lprintf(1, "cdb_delete(%d): %s\n", cdb,
+                     db_strerror(ret));
+             abort();
+           }
+       }
+      else
+       {
+         txcommit(tid);
+       }
+    }
+  return ret;
 }
 
+static DBC *localcursor(int cdb)
+{
+  int ret;
+  DBC *curs;
 
+  if (MYCURSORS[cdb] == NULL)
+    ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
+  else
+    ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
+
+  if (ret)
+    {
+      lprintf(1, "localcursor: %s\n", db_strerror(ret));
+      abort();
+    }
+
+  return curs;
+}
 
 
 /*
@@ -461,45 +507,56 @@ int cdb_delete(int cdb, void *key, int keylen)
 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
 {
 
-       struct cdbdata *tempcdb;
-       DBT dkey, dret;
-       int ret;
+  struct cdbdata *tempcdb;
+  DBT dkey, dret;
+  int ret;
 
-       memset(&dkey, 0, sizeof(DBT));
-       dkey.size = keylen;
-       dkey.data = key;
+  memset(&dkey, 0, sizeof(DBT));
+  dkey.size = keylen;
+  dkey.data = key;
 
-       if (MYTID != NULL) {
-               memset(&dret, 0, sizeof(DBT));
-               dret.flags = DB_DBT_MALLOC;
-               ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
-       } else {
-           retry:
-                memset(&dret, 0, sizeof(DBT));
-                dret.flags = DB_DBT_MALLOC;
+  if (MYTID != NULL)
+    {
+      memset(&dret, 0, sizeof(DBT));
+      dret.flags = DB_DBT_MALLOC;
+      ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
+    }
+  else
+    {
+      DBC *curs;
 
-               ret = dbp[cdb]->get(dbp[cdb], NULL, &dkey, &dret, 0);
+      do
+       {
+         memset(&dret, 0, sizeof(DBT));
+         dret.flags = DB_DBT_MALLOC;
 
-               if (ret == DB_LOCK_DEADLOCK)
-                       goto retry;
+         curs = localcursor(cdb);
 
-               if (ret && ret != DB_NOTFOUND)
-                       abort();
+         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
+         cclose(curs);
        }
+      while (ret == DB_LOCK_DEADLOCK);
 
-       if ((ret != 0) && (ret != DB_NOTFOUND)) {
-               lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
-               abort();
-       }
-       if (ret != 0) return NULL;
-       tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
-       if (tempcdb == NULL) {
-               lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
-               abort();
-       }
-       tempcdb->len = dret.size;
-       tempcdb->ptr = dret.data;
-       return (tempcdb);
+    }
+
+  if ((ret != 0) && (ret != DB_NOTFOUND))
+    {
+      lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
+      abort();
+    }
+
+  if (ret != 0) return NULL;
+  tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
+
+  if (tempcdb == NULL)
+    {
+      lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
+      abort();
+    }
+
+  tempcdb->len = dret.size;
+  tempcdb->ptr = dret.data;
+  return (tempcdb);
 }
 
 
@@ -526,11 +583,6 @@ void cdb_rewind(int cdb)
        if (MYCURSORS[cdb] != NULL)
                cclose(MYCURSORS[cdb]);
 
-       if (MYTID == NULL) {
-               lprintf(1, "cdb_rewind: ERROR: cursor use outside transaction\n");
-               abort();
-       }
-
        /*
         * Now initialize the cursor
         */
@@ -585,14 +637,15 @@ struct cdbdata *cdb_next_item(int cdb)
 
 void cdb_begin_transaction(void) {
 
-       /******** this check slows it down and is not needed except to debug
-       if (MYTID != NULL) {
-               lprintf(1, "cdb_begin_transaction: ERROR: opening a new transaction with one already open!\n");
-               abort();
-       }
-       ***************************/
+  bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
+
+  if (MYTID != NULL)
+    {
+      lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
+      abort();
+    }
 
-       txbegin(&MYTID);
+  txbegin(&MYTID);
 }
 
 void cdb_end_transaction(void) {