* Merged in code from the TRANSACTIONS branch for testing.
authorArt Cancro <ajc@citadel.org>
Tue, 9 Jan 2001 05:39:45 +0000 (05:39 +0000)
committerArt Cancro <ajc@citadel.org>
Tue, 9 Jan 2001 05:39:45 +0000 (05:39 +0000)
citadel/ChangeLog
citadel/database.c
citadel/database_sleepycat.c
citadel/room_ops.c
citadel/serv_expire.c
citadel/serv_upgrade.c
citadel/serv_vandelay.c
citadel/user_ops.c

index 9b6b32b94cecfb43c9cdf663aa414679eeefaaa7..28d07edc69fbbabd556dae4909a360f415cd5e9c 100644 (file)
@@ -1,4 +1,7 @@
  $Log$
+ Revision 573.62  2001/01/09 05:39:45  ajc
+ * Merged in code from the TRANSACTIONS branch for testing.
+
  Revision 573.61  2000/12/30 06:17:17  ajc
  * Still more work on IMAP.  Damn this is tedious.
 
@@ -2259,3 +2262,4 @@ Sat Jul 11 00:20:48 EDT 1998 Nathan Bryant <bryant@cs.usm.maine.edu>
 
 Fri Jul 10 1998 Art Cancro <ajc@uncensored.citadel.org>
        * Initial CVS import 
+
index c9bd3988a41baf0bfb885dbe101040c1c5e3a142..a95abbe174fdd18edfabac1a41cc9e8419c0c967 100644 (file)
@@ -338,3 +338,13 @@ struct cdbdata *cdb_next_item(int cdb)
        return (cdbret);
 }
 
+
+/*
+ * empty functions because GDBM doesn't have transaction support
+ */
+
+void cdb_begin_transaction(void) {
+}
+
+void cdb_end_transaction(void) {
+}
index f522fd1aec5adbe61478db19bf5f6d95d68e5b85..d5c0c4e4948fd70e2473337cf637cd34ec7b9468 100644 (file)
@@ -5,19 +5,29 @@
  *
  */
 
+/*****************************************************************************
+       Tunable configuration parameters for the Sleepycat DB back end
+ *****************************************************************************/
+
+/* Citadel will checkpoint the db at the end of every session, but only if
+ * the specified number of kilobytes has been written, or if the specified
+ * number of minutes has passed, since the last checkpoint.
+ */
+#define MAX_CHECKPOINT_KBYTES  0
+#define MAX_CHECKPOINT_MINUTES 15
+
 /*****************************************************************************/
 
 #include "sysdep.h"
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
-#include <time.h>
 #include <ctype.h>
 #include <string.h>
 #include <errno.h>
 #include <sys/types.h>
-#include <signal.h>
 #include <db.h>
+#include <pthread.h>
 #include "citadel.h"
 #include "server.h"
 #include "citserver.h"
 DB *dbp[MAXCDB];               /* One DB handle for each Citadel database */
 DB_ENV *dbenv;                 /* The DB environment (global) */
 
-struct cdbssd {                        /* Session-specific DB stuff */
+struct cdbtsd {                        /* Thread-specific DB stuff */
+       DB_TXN *tid;            /* Transaction handle */
        DBC *cursor;            /* Cursor, for traversals... */
 };
 
-struct cdbssd *ssd_arr = NULL;
 int num_ssd = 0;
 
-#define MYCURSOR       ssd_arr[CC->cs_pid].cursor
-#define MYTID          NULL
+static pthread_key_t tsdkey;
+
+#define MYCURSOR       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
+#define MYTID          (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
+
+/* just a little helper function */
+static int txabort(DB_TXN *tid) {
+        int ret = txn_abort(tid);
+
+        if (ret)
+                lprintf(1, "txn_abort: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+/* this one is even more helpful than the last. */
+static int txcommit(DB_TXN *tid) {
+        int ret = txn_commit(tid, 0);
+
+        if (ret)
+                lprintf(1, "txn_commit: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+/* are you sensing a pattern yet? */
+static int txbegin(DB_TXN **tid) {
+        int ret = txn_begin(dbenv, NULL, tid, 0);
+
+        if (ret)
+                lprintf(1, "txn_begin: %s\n", db_strerror(ret));
+
+        return ret;
+}
+
+static void release_handles(void *arg) {
+       if (arg != NULL) {
+               struct cdbtsd *tsd = (struct cdbtsd *)arg;
+
+               if (tsd->cursor != NULL) {
+                       lprintf(1, "WARNING: cursor still in progress; "
+                               "closing!\n");
+                       tsd->cursor->c_close(tsd->cursor);
+               }
+
+               if (tsd->tid != NULL) {
+                       lprintf(1, "ERROR: transaction still in progress; "
+                               "aborting!\n");
+                       txabort(tsd->tid);
+               }
+       }
+}
+
+static void dest_tsd(void *arg) {
+       if (arg != NULL) {
+               release_handles(arg);
+               phree(arg);
+       }
+}
 
 /*
- * Ensure that we have enough space for session-specific data.  We don't
+ * Ensure that we have a key for thread-specific data.  We don't
  * put anything in here that Citadel cares about; this is just database
- * related stuff like cursors.
+ * related stuff like cursors and transactions.
+ *
+ * This should be called immediately after startup by any thread which wants
+ * to use database calls, except for whatever thread calls open_databases.
  */
-void cdb_allocate_ssd(void) {
-       /*
-        * Make sure we have a cursor allocated for this session
-        */
+void cdb_allocate_tsd(void) {
+       struct cdbtsd *tsd = mallok(sizeof *tsd);
 
-       lprintf(9, "num_ssd before realloc = %d\n", num_ssd);
-       if (num_ssd <= CC->cs_pid) {
-               num_ssd = CC->cs_pid + 1;
-               if (ssd_arr == NULL) {
-                       ssd_arr = (struct cdbssd *)
-                           mallok((sizeof(struct cdbssd) * num_ssd));
-               } else {
-                       ssd_arr = (struct cdbssd *)
-                           reallok(ssd_arr, (sizeof(struct cdbssd) * num_ssd));
-               }
-       }
-       lprintf(9, "num_ssd  after realloc = %d\n", num_ssd);
+       tsd->tid = NULL;
+       tsd->cursor = NULL;
+       pthread_setspecific(tsdkey, tsd);
+}
+
+void cdb_free_tsd(void) {
+       dest_tsd(pthread_getspecific(tsdkey));
+       pthread_setspecific(tsdkey, NULL);
+}
+
+void cdb_release_handles(void) {
+       release_handles(pthread_getspecific(tsdkey));
 }
 
 
@@ -77,6 +145,21 @@ void defrag_databases(void)
 
 
 
+/*
+ * Request a checkpoint of the database.
+ */
+static void cdb_checkpoint(void) {
+       int ret;
+
+       ret = txn_checkpoint(dbenv,
+                               MAX_CHECKPOINT_KBYTES,
+                               MAX_CHECKPOINT_MINUTES,
+                               0);
+       if (ret) {
+               lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+       }
+}
+
 /*
  * Open the various databases we'll be using.  Any database which
  * does not exist should be created.  Note that we don't need an S_DATABASE
@@ -87,7 +170,7 @@ void open_databases(void)
 {
        int ret;
        int i;
-       char dbfilename[SIZ];
+       char dbfilename[256];
        u_int32_t flags = 0;
 
        lprintf(9, "open_databases() starting\n");
@@ -98,6 +181,7 @@ void open_databases(void)
         system("exec mkdir data 2>/dev/null");
 
        lprintf(9, "Setting up DB environment\n");
+       db_env_set_func_yield(sched_yield);
        ret = db_env_create(&dbenv, 0);
        if (ret) {
                lprintf(1, "db_env_create: %s\n", db_strerror(ret));
@@ -116,15 +200,14 @@ void open_databases(void)
                 exit(ret);
         }
 
-        /*
-        * We specify DB_PRIVATE but not DB_INIT_LOCK or DB_THREAD, even
-        * though this is a multithreaded application.  Since Citadel does all
-        * database access in S_DATABASE critical sections, access to the db
-        * is serialized already, so don't bother the database manager with
-        * it.  Besides, it locks up when we do it that way.
-         */
-        flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_LOG;
-       /* flags |= DB_INIT_LOCK | DB_THREAD; */
+       if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
+               lprintf(1, "set_lk_detect: %s\n", db_strerror(ret));
+               dbenv->close(dbenv, 0);
+               exit(ret);
+       }
+
+        flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
+               DB_INIT_LOCK|DB_THREAD;
         ret = dbenv->open(dbenv, "./data", flags, 0);
        if (ret) {
                lprintf(1, "dbenv->open: %s\n", db_strerror(ret));
@@ -161,8 +244,13 @@ void open_databases(void)
                }
        }
 
-       cdb_allocate_ssd();
-       CtdlRegisterSessionHook(cdb_allocate_ssd, EVT_START);
+       if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
+               lprintf(1, "pthread_key_create: %s\n", strerror(ret));
+               exit(1);
+       }
+
+       cdb_allocate_tsd();
+       CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
        lprintf(9, "open_databases() finished\n");
 }
 
@@ -176,7 +264,12 @@ void close_databases(void)
        int a;
        int ret;
 
-       begin_critical_section(S_DATABASE);
+       cdb_free_tsd();
+
+       if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
+               lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
+       }
+
        for (a = 0; a < MAXCDB; ++a) {
                lprintf(7, "Closing database %d\n", a);
                ret = dbp[a]->close(dbp[a], 0);
@@ -186,20 +279,13 @@ void close_databases(void)
                
        }
 
-
-
         /* Close the handle. */
         ret = dbenv->close(dbenv, 0);
        if (ret) {
                 lprintf(1, "DBENV->close: %s\n", db_strerror(ret));
         }
-
-
-       end_critical_section(S_DATABASE);
-
 }
 
-
 /*
  * Store a piece of data.  Returns 0 if the operation was successful.  If a
  * key already exists it should be overwritten.
@@ -210,6 +296,7 @@ int cdb_store(int cdb,
 {
 
        DBT dkey, ddata;
+       DB_TXN *tid;
        int ret;
 
        memset(&dkey, 0, sizeof(DBT));
@@ -219,18 +306,42 @@ int cdb_store(int cdb,
        ddata.size = cdatalen;
        ddata.data = cdata;
 
-       begin_critical_section(S_DATABASE);
-       ret = dbp[cdb]->put(dbp[cdb],           /* db */
-                               MYTID,          /* transaction ID */
-                               &dkey,          /* key */
-                               &ddata,         /* data */
-                               0);             /* flags */
-       end_critical_section(S_DATABASE);
-       if (ret) {
-               lprintf(1, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
-               return (-1);
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->put(dbp[cdb],           /* db */
+                                       MYTID,          /* transaction ID */
+                                       &dkey,          /* key */
+                                       &ddata,         /* data */
+                                       0);             /* flags */
+               if (ret) {
+                       lprintf(1, "cdb_store(%d): %s\n", cdb,
+                               db_strerror(ret));
+               }
+               return ret;
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return -1;
+
+               if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
+                                        tid,           /* transaction ID */
+                                        &dkey,         /* key */
+                                        &ddata,        /* data */
+                                        0))) {         /* flags */
+                       if (ret == DB_LOCK_DEADLOCK) {
+                               if (txabort(tid))
+                                       return ret;
+                               else
+                                       goto retry;
+                       } else {
+                               lprintf(1, "cdb_store(%d): %s\n", cdb,
+                                       db_strerror(ret));
+                               txabort(tid);
+                               return ret;
+                       }
+               } else {
+                       return txcommit(tid);
+               }
        }
-       return (0);
 }
 
 
@@ -241,16 +352,36 @@ int cdb_delete(int cdb, void *key, int keylen)
 {
 
        DBT dkey;
+       DB_TXN *tid;
        int ret;
 
        dkey.size = keylen;
        dkey.data = key;
 
-       begin_critical_section(S_DATABASE);
-       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
-       end_critical_section(S_DATABASE);
-       return (ret);
-
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
+               return (ret);
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return -1;
+
+               if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) {
+                       if (ret == DB_LOCK_DEADLOCK) {
+                                       if (txabort(tid))
+                                               return ret;
+                                       else
+                                               goto retry;
+                       } else {
+                               lprintf(1, "cdb_store(%d): %s\n", cdb,
+                                       db_strerror(ret));
+                               txabort(tid);
+                               return ret;
+                       }
+               } else {
+                       return txcommit(tid);
+               }
+       }
 }
 
 
@@ -266,6 +397,7 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
 
        struct cdbdata *tempcdb;
        DBT dkey, dret;
+       DB_TXN *tid;
        int ret;
 
        memset(&dkey, 0, sizeof(DBT));
@@ -274,9 +406,24 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
        dkey.data = key;
        dret.flags = DB_DBT_MALLOC;
 
-       begin_critical_section(S_DATABASE);
-       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
-       end_critical_section(S_DATABASE);
+       if (MYTID != NULL) {
+               ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
+       } else {
+           retry:
+               if (txbegin(&tid))
+                       return NULL;
+
+               ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
+
+               if (ret == DB_LOCK_DEADLOCK) {
+                       if (txabort(tid))
+                               return NULL;
+                       else
+                               goto retry;
+               } else if (txcommit(tid))
+                       return NULL;
+       }
+
        if ((ret != 0) && (ret != DB_NOTFOUND)) {
                lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
        }
@@ -305,21 +452,27 @@ void cdb_free(struct cdbdata *cdb)
 /* 
  * Prepare for a sequential search of an entire database.
  * (There is guaranteed to be no more than one traversal in
- * progress per session at any given time.)
+ * progress per thread at any given time.)
  */
 void cdb_rewind(int cdb)
 {
        int ret = 0;
 
+       if (MYCURSOR != NULL)
+               MYCURSOR->c_close(MYCURSOR);
+
+       if (MYTID == NULL) {
+               lprintf(1, "ERROR: cursor use outside transaction\n");
+               abort();
+       }
+
        /*
         * Now initialize the cursor
         */
-       begin_critical_section(S_DATABASE);
        ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
        if (ret) {
                lprintf(1, "db_cursor: %s\n", db_strerror(ret));
        }
-       end_critical_section(S_DATABASE);
 }
 
 
@@ -338,12 +491,14 @@ struct cdbdata *cdb_next_item(int cdb)
         memset(&data, 0, sizeof(data));
        data.flags = DB_DBT_MALLOC;
 
-       begin_critical_section(S_DATABASE);
        ret = MYCURSOR->c_get(MYCURSOR,
                &key, &data, DB_NEXT);
-       end_critical_section(S_DATABASE);
        
-       if (ret) return NULL;           /* presumably, end of file */
+       if (ret) {
+               MYCURSOR->c_close(MYCURSOR);
+               MYCURSOR = NULL;
+               return NULL;            /* presumably, end of file */
+       }
 
        cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
        cdbret->len = data.size;
@@ -352,3 +507,30 @@ struct cdbdata *cdb_next_item(int cdb)
        return (cdbret);
 }
 
+
+/*
+ * Transaction-based stuff.  I'm writing this as I bake cookies...
+ */
+
+void cdb_begin_transaction(void) {
+
+       if (MYTID != NULL) {    /* FIXME this slows it down, take it out */
+               lprintf(1, "ERROR: opening a new transaction with one already open!\n");
+               abort();
+       }
+       else {
+               txbegin(&MYTID);
+       }
+}
+
+void cdb_end_transaction(void) {
+       if (MYCURSOR != NULL) {
+               lprintf(1, "WARNING: cursor still open at transaction end\n");
+               MYCURSOR->c_close(MYCURSOR);
+               MYCURSOR = NULL;
+       }
+       if (MYTID == NULL) lprintf(1, "ERROR: txcommit(NULL) !!\n");
+       else txcommit(MYTID);
+       MYTID = NULL;
+}
+
index e8e0df69b3270ea98133ee93805895b35bf477fe..48c7d00d2318577ddc787ca74e2eb925821f9105 100644 (file)
@@ -309,6 +309,7 @@ void ForEachRoom(void (*CallBack) (struct quickroom *EachRoom, void *out_data),
        struct quickroom qrbuf;
        struct cdbdata *cdbqr;
 
+       cdb_begin_transaction();
        cdb_rewind(CDB_QUICKROOM);
 
        while (cdbqr = cdb_next_item(CDB_QUICKROOM), cdbqr != NULL) {
@@ -321,6 +322,7 @@ void ForEachRoom(void (*CallBack) (struct quickroom *EachRoom, void *out_data),
                if (qrbuf.QRflags & QR_INUSE)
                        (*CallBack)(&qrbuf, in_data);
        }
+       cdb_end_transaction();
 }
 
 
@@ -791,6 +793,7 @@ void cmd_whok(void)
        struct usersupp temp;
        struct cdbdata *cdbus;
 
+       cdb_begin_transaction();
        getuser(&CC->usersupp, CC->curr_user);
        if (CtdlAccessCheck(ac_room_aide)) return;
 
@@ -806,6 +809,7 @@ void cmd_whok(void)
                    )
                        cprintf("%s\n", temp.fullname);
        }
+       cdb_end_transaction();
        cprintf("000\n");
 }
 
index 2019d58613ced93ea6f0542cb7ee63407dea0d73..7ebb33e651ae19d872ebe24671925b3e48fe6faa 100644 (file)
@@ -436,6 +436,7 @@ int PurgeVisits(void) {
        ForEachUser(AddValidUser, NULL);
 
        /* Now traverse through the visits, purging irrelevant records... */
+       cdb_begin_transaction();
        cdb_rewind(CDB_VISIT);
        while(cdbvisit = cdb_next_item(CDB_VISIT), cdbvisit != NULL) {
                memset(&vbuf, 0, sizeof(struct visit));
@@ -473,6 +474,8 @@ int PurgeVisits(void) {
 
        }
 
+       cdb_end_transaction();
+
        /* Free the valid room/gen combination list */
        while (ValidRoomList != NULL) {
                vrptr = ValidRoomList->next;
index 176194cd99d99bb4dac89ef587b94f857dccd8de..51d0e430f25baa4093cc18945c745627d15798c1 100644 (file)
@@ -49,6 +49,7 @@ void do_pre555_usersupp_upgrade(void) {
        strcpy(tempfilename, tmpnam(NULL));
 
        /* First, back out all old version records to a flat file */
+       cdb_begin_transaction();
         cdb_rewind(CDB_USERSUPP);
         while(cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) {
                 memset(&usbuf, 0, sizeof(struct pre555usersupp));
@@ -58,6 +59,7 @@ void do_pre555_usersupp_upgrade(void) {
                 cdb_free(cdbus);
                fwrite(&usbuf, sizeof(struct pre555usersupp), 1, fp);
        }
+       cdb_end_transaction();
 
        /* ...and overwrite the records with new format records */
        rewind(fp);
index 3534a183695fda95b0c2432cfd96f2613c18b008..d1cbd2db857f3f60b8e31e31c8f8e5168517abd5 100644 (file)
@@ -144,6 +144,7 @@ void artv_export_visits(void) {
        struct visit vbuf;
        struct cdbdata *cdbv;
 
+       cdb_begin_transaction();
        cdb_rewind(CDB_VISIT);
 
        while (cdbv = cdb_next_item(CDB_VISIT), cdbv != NULL) {
@@ -160,6 +161,7 @@ void artv_export_visits(void) {
                cprintf("%ld\n", vbuf.v_lastseen);
                cprintf("%u\n", vbuf.v_flags);
        }
+       cdb_end_transaction();
 }
 
 
index cdff7ca70a92e752e15b65a56a8f1273a9ff9401..4350f630fbf4ba4bd781f018bd54ed436f5e780c 100644 (file)
@@ -265,6 +265,7 @@ int getuserbynumber(struct usersupp *usbuf, long int number)
 {
        struct cdbdata *cdbus;
 
+       cdb_begin_transaction();
        cdb_rewind(CDB_USERSUPP);
 
        while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) {
@@ -274,9 +275,11 @@ int getuserbynumber(struct usersupp *usbuf, long int number)
                        sizeof(struct usersupp) : cdbus->len));
                cdb_free(cdbus);
                if (usbuf->usernum == number) {
+                       cdb_end_transaction();
                        return (0);
                }
        }
+       cdb_end_transaction();
        return (-1);
 }
 
@@ -940,6 +943,7 @@ void cmd_gnur(void)
        /* There are unvalidated users.  Traverse the usersupp database,
         * and return the first user we find that needs validation.
         */
+       cdb_begin_transaction();
        cdb_rewind(CDB_USERSUPP);
        while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) {
                memset(&usbuf, 0, sizeof(struct usersupp));
@@ -950,9 +954,11 @@ void cmd_gnur(void)
                if ((usbuf.flags & US_NEEDVALID)
                    && (usbuf.axlevel > 0)) {
                        cprintf("%d %s\n", MORE_DATA, usbuf.fullname);
+                       cdb_end_transaction();
                        return;
                }
        }
+       cdb_end_transaction();
 
        /* If we get to this point, there are no more unvalidated users.
         * Therefore we clear the "users need validation" flag.
@@ -1016,6 +1022,7 @@ void ForEachUser(void (*CallBack) (struct usersupp * EachUser, void *out_data),
        struct usersupp usbuf;
        struct cdbdata *cdbus;
 
+       cdb_begin_transaction();
        cdb_rewind(CDB_USERSUPP);
 
        while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) {
@@ -1026,6 +1033,7 @@ void ForEachUser(void (*CallBack) (struct usersupp * EachUser, void *out_data),
                cdb_free(cdbus);
                (*CallBack) (&usbuf, in_data);
        }
+       cdb_end_transaction();
 }