From 3c8413419861a188e48d941a31197e7c372a8d46 Mon Sep 17 00:00:00 2001 From: Art Cancro Date: Tue, 9 Jan 2001 05:39:45 +0000 Subject: [PATCH] * Merged in code from the TRANSACTIONS branch for testing. --- citadel/ChangeLog | 4 + citadel/database.c | 10 ++ citadel/database_sleepycat.c | 320 +++++++++++++++++++++++++++-------- citadel/room_ops.c | 4 + citadel/serv_expire.c | 3 + citadel/serv_upgrade.c | 2 + citadel/serv_vandelay.c | 2 + citadel/user_ops.c | 8 + 8 files changed, 284 insertions(+), 69 deletions(-) diff --git a/citadel/ChangeLog b/citadel/ChangeLog index 9b6b32b94..28d07edc6 100644 --- a/citadel/ChangeLog +++ b/citadel/ChangeLog @@ -1,4 +1,7 @@ $Log$ + Revision 573.62 2001/01/09 05:39:45 ajc + * Merged in code from the TRANSACTIONS branch for testing. + Revision 573.61 2000/12/30 06:17:17 ajc * Still more work on IMAP. Damn this is tedious. @@ -2259,3 +2262,4 @@ Sat Jul 11 00:20:48 EDT 1998 Nathan Bryant Fri Jul 10 1998 Art Cancro * Initial CVS import + diff --git a/citadel/database.c b/citadel/database.c index c9bd3988a..a95abbe17 100644 --- a/citadel/database.c +++ b/citadel/database.c @@ -338,3 +338,13 @@ struct cdbdata *cdb_next_item(int cdb) return (cdbret); } + +/* + * empty functions because GDBM doesn't have transaction support + */ + +void cdb_begin_transaction(void) { +} + +void cdb_end_transaction(void) { +} diff --git a/citadel/database_sleepycat.c b/citadel/database_sleepycat.c index f522fd1ae..d5c0c4e49 100644 --- a/citadel/database_sleepycat.c +++ b/citadel/database_sleepycat.c @@ -5,19 +5,29 @@ * */ +/***************************************************************************** + Tunable configuration parameters for the Sleepycat DB back end + *****************************************************************************/ + +/* Citadel will checkpoint the db at the end of every session, but only if + * the specified number of kilobytes has been written, or if the specified + * number of minutes has passed, since the last checkpoint. + */ +#define MAX_CHECKPOINT_KBYTES 0 +#define MAX_CHECKPOINT_MINUTES 15 + /*****************************************************************************/ #include "sysdep.h" #include #include #include -#include #include #include #include #include -#include #include +#include #include "citadel.h" #include "server.h" #include "citserver.h" @@ -28,38 +38,96 @@ DB *dbp[MAXCDB]; /* One DB handle for each Citadel database */ DB_ENV *dbenv; /* The DB environment (global) */ -struct cdbssd { /* Session-specific DB stuff */ +struct cdbtsd { /* Thread-specific DB stuff */ + DB_TXN *tid; /* Transaction handle */ DBC *cursor; /* Cursor, for traversals... */ }; -struct cdbssd *ssd_arr = NULL; int num_ssd = 0; -#define MYCURSOR ssd_arr[CC->cs_pid].cursor -#define MYTID NULL +static pthread_key_t tsdkey; + +#define MYCURSOR (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor) +#define MYTID (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid) + +/* just a little helper function */ +static int txabort(DB_TXN *tid) { + int ret = txn_abort(tid); + + if (ret) + lprintf(1, "txn_abort: %s\n", db_strerror(ret)); + + return ret; +} + +/* this one is even more helpful than the last. */ +static int txcommit(DB_TXN *tid) { + int ret = txn_commit(tid, 0); + + if (ret) + lprintf(1, "txn_commit: %s\n", db_strerror(ret)); + + return ret; +} + +/* are you sensing a pattern yet? */ +static int txbegin(DB_TXN **tid) { + int ret = txn_begin(dbenv, NULL, tid, 0); + + if (ret) + lprintf(1, "txn_begin: %s\n", db_strerror(ret)); + + return ret; +} + +static void release_handles(void *arg) { + if (arg != NULL) { + struct cdbtsd *tsd = (struct cdbtsd *)arg; + + if (tsd->cursor != NULL) { + lprintf(1, "WARNING: cursor still in progress; " + "closing!\n"); + tsd->cursor->c_close(tsd->cursor); + } + + if (tsd->tid != NULL) { + lprintf(1, "ERROR: transaction still in progress; " + "aborting!\n"); + txabort(tsd->tid); + } + } +} + +static void dest_tsd(void *arg) { + if (arg != NULL) { + release_handles(arg); + phree(arg); + } +} /* - * Ensure that we have enough space for session-specific data. We don't + * Ensure that we have a key for thread-specific data. We don't * put anything in here that Citadel cares about; this is just database - * related stuff like cursors. + * related stuff like cursors and transactions. + * + * This should be called immediately after startup by any thread which wants + * to use database calls, except for whatever thread calls open_databases. */ -void cdb_allocate_ssd(void) { - /* - * Make sure we have a cursor allocated for this session - */ +void cdb_allocate_tsd(void) { + struct cdbtsd *tsd = mallok(sizeof *tsd); - lprintf(9, "num_ssd before realloc = %d\n", num_ssd); - if (num_ssd <= CC->cs_pid) { - num_ssd = CC->cs_pid + 1; - if (ssd_arr == NULL) { - ssd_arr = (struct cdbssd *) - mallok((sizeof(struct cdbssd) * num_ssd)); - } else { - ssd_arr = (struct cdbssd *) - reallok(ssd_arr, (sizeof(struct cdbssd) * num_ssd)); - } - } - lprintf(9, "num_ssd after realloc = %d\n", num_ssd); + tsd->tid = NULL; + tsd->cursor = NULL; + pthread_setspecific(tsdkey, tsd); +} + +void cdb_free_tsd(void) { + dest_tsd(pthread_getspecific(tsdkey)); + pthread_setspecific(tsdkey, NULL); +} + +void cdb_release_handles(void) { + release_handles(pthread_getspecific(tsdkey)); } @@ -77,6 +145,21 @@ void defrag_databases(void) +/* + * Request a checkpoint of the database. + */ +static void cdb_checkpoint(void) { + int ret; + + ret = txn_checkpoint(dbenv, + MAX_CHECKPOINT_KBYTES, + MAX_CHECKPOINT_MINUTES, + 0); + if (ret) { + lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret)); + } +} + /* * Open the various databases we'll be using. Any database which * does not exist should be created. Note that we don't need an S_DATABASE @@ -87,7 +170,7 @@ void open_databases(void) { int ret; int i; - char dbfilename[SIZ]; + char dbfilename[256]; u_int32_t flags = 0; lprintf(9, "open_databases() starting\n"); @@ -98,6 +181,7 @@ void open_databases(void) system("exec mkdir data 2>/dev/null"); lprintf(9, "Setting up DB environment\n"); + db_env_set_func_yield(sched_yield); ret = db_env_create(&dbenv, 0); if (ret) { lprintf(1, "db_env_create: %s\n", db_strerror(ret)); @@ -116,15 +200,14 @@ void open_databases(void) exit(ret); } - /* - * We specify DB_PRIVATE but not DB_INIT_LOCK or DB_THREAD, even - * though this is a multithreaded application. Since Citadel does all - * database access in S_DATABASE critical sections, access to the db - * is serialized already, so don't bother the database manager with - * it. Besides, it locks up when we do it that way. - */ - flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_LOG; - /* flags |= DB_INIT_LOCK | DB_THREAD; */ + if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) { + lprintf(1, "set_lk_detect: %s\n", db_strerror(ret)); + dbenv->close(dbenv, 0); + exit(ret); + } + + flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN| + DB_INIT_LOCK|DB_THREAD; ret = dbenv->open(dbenv, "./data", flags, 0); if (ret) { lprintf(1, "dbenv->open: %s\n", db_strerror(ret)); @@ -161,8 +244,13 @@ void open_databases(void) } } - cdb_allocate_ssd(); - CtdlRegisterSessionHook(cdb_allocate_ssd, EVT_START); + if ((ret = pthread_key_create(&tsdkey, dest_tsd))) { + lprintf(1, "pthread_key_create: %s\n", strerror(ret)); + exit(1); + } + + cdb_allocate_tsd(); + CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER); lprintf(9, "open_databases() finished\n"); } @@ -176,7 +264,12 @@ void close_databases(void) int a; int ret; - begin_critical_section(S_DATABASE); + cdb_free_tsd(); + + if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) { + lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret)); + } + for (a = 0; a < MAXCDB; ++a) { lprintf(7, "Closing database %d\n", a); ret = dbp[a]->close(dbp[a], 0); @@ -186,20 +279,13 @@ void close_databases(void) } - - /* Close the handle. */ ret = dbenv->close(dbenv, 0); if (ret) { lprintf(1, "DBENV->close: %s\n", db_strerror(ret)); } - - - end_critical_section(S_DATABASE); - } - /* * Store a piece of data. Returns 0 if the operation was successful. If a * key already exists it should be overwritten. @@ -210,6 +296,7 @@ int cdb_store(int cdb, { DBT dkey, ddata; + DB_TXN *tid; int ret; memset(&dkey, 0, sizeof(DBT)); @@ -219,18 +306,42 @@ int cdb_store(int cdb, ddata.size = cdatalen; ddata.data = cdata; - begin_critical_section(S_DATABASE); - ret = dbp[cdb]->put(dbp[cdb], /* db */ - MYTID, /* transaction ID */ - &dkey, /* key */ - &ddata, /* data */ - 0); /* flags */ - end_critical_section(S_DATABASE); - if (ret) { - lprintf(1, "cdb_store(%d): %s\n", cdb, db_strerror(ret)); - return (-1); + if (MYTID != NULL) { + ret = dbp[cdb]->put(dbp[cdb], /* db */ + MYTID, /* transaction ID */ + &dkey, /* key */ + &ddata, /* data */ + 0); /* flags */ + if (ret) { + lprintf(1, "cdb_store(%d): %s\n", cdb, + db_strerror(ret)); + } + return ret; + } else { + retry: + if (txbegin(&tid)) + return -1; + + if ((ret = dbp[cdb]->put(dbp[cdb], /* db */ + tid, /* transaction ID */ + &dkey, /* key */ + &ddata, /* data */ + 0))) { /* flags */ + if (ret == DB_LOCK_DEADLOCK) { + if (txabort(tid)) + return ret; + else + goto retry; + } else { + lprintf(1, "cdb_store(%d): %s\n", cdb, + db_strerror(ret)); + txabort(tid); + return ret; + } + } else { + return txcommit(tid); + } } - return (0); } @@ -241,16 +352,36 @@ int cdb_delete(int cdb, void *key, int keylen) { DBT dkey; + DB_TXN *tid; int ret; dkey.size = keylen; dkey.data = key; - begin_critical_section(S_DATABASE); - ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0); - end_critical_section(S_DATABASE); - return (ret); - + if (MYTID != NULL) { + ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0); + return (ret); + } else { + retry: + if (txbegin(&tid)) + return -1; + + if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) { + if (ret == DB_LOCK_DEADLOCK) { + if (txabort(tid)) + return ret; + else + goto retry; + } else { + lprintf(1, "cdb_store(%d): %s\n", cdb, + db_strerror(ret)); + txabort(tid); + return ret; + } + } else { + return txcommit(tid); + } + } } @@ -266,6 +397,7 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen) struct cdbdata *tempcdb; DBT dkey, dret; + DB_TXN *tid; int ret; memset(&dkey, 0, sizeof(DBT)); @@ -274,9 +406,24 @@ struct cdbdata *cdb_fetch(int cdb, void *key, int keylen) dkey.data = key; dret.flags = DB_DBT_MALLOC; - begin_critical_section(S_DATABASE); - ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0); - end_critical_section(S_DATABASE); + if (MYTID != NULL) { + ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0); + } else { + retry: + if (txbegin(&tid)) + return NULL; + + ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0); + + if (ret == DB_LOCK_DEADLOCK) { + if (txabort(tid)) + return NULL; + else + goto retry; + } else if (txcommit(tid)) + return NULL; + } + if ((ret != 0) && (ret != DB_NOTFOUND)) { lprintf(1, "cdb_fetch: %s\n", db_strerror(ret)); } @@ -305,21 +452,27 @@ void cdb_free(struct cdbdata *cdb) /* * Prepare for a sequential search of an entire database. * (There is guaranteed to be no more than one traversal in - * progress per session at any given time.) + * progress per thread at any given time.) */ void cdb_rewind(int cdb) { int ret = 0; + if (MYCURSOR != NULL) + MYCURSOR->c_close(MYCURSOR); + + if (MYTID == NULL) { + lprintf(1, "ERROR: cursor use outside transaction\n"); + abort(); + } + /* * Now initialize the cursor */ - begin_critical_section(S_DATABASE); ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0); if (ret) { lprintf(1, "db_cursor: %s\n", db_strerror(ret)); } - end_critical_section(S_DATABASE); } @@ -338,12 +491,14 @@ struct cdbdata *cdb_next_item(int cdb) memset(&data, 0, sizeof(data)); data.flags = DB_DBT_MALLOC; - begin_critical_section(S_DATABASE); ret = MYCURSOR->c_get(MYCURSOR, &key, &data, DB_NEXT); - end_critical_section(S_DATABASE); - if (ret) return NULL; /* presumably, end of file */ + if (ret) { + MYCURSOR->c_close(MYCURSOR); + MYCURSOR = NULL; + return NULL; /* presumably, end of file */ + } cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata)); cdbret->len = data.size; @@ -352,3 +507,30 @@ struct cdbdata *cdb_next_item(int cdb) return (cdbret); } + +/* + * Transaction-based stuff. I'm writing this as I bake cookies... + */ + +void cdb_begin_transaction(void) { + + if (MYTID != NULL) { /* FIXME this slows it down, take it out */ + lprintf(1, "ERROR: opening a new transaction with one already open!\n"); + abort(); + } + else { + txbegin(&MYTID); + } +} + +void cdb_end_transaction(void) { + if (MYCURSOR != NULL) { + lprintf(1, "WARNING: cursor still open at transaction end\n"); + MYCURSOR->c_close(MYCURSOR); + MYCURSOR = NULL; + } + if (MYTID == NULL) lprintf(1, "ERROR: txcommit(NULL) !!\n"); + else txcommit(MYTID); + MYTID = NULL; +} + diff --git a/citadel/room_ops.c b/citadel/room_ops.c index e8e0df69b..48c7d00d2 100644 --- a/citadel/room_ops.c +++ b/citadel/room_ops.c @@ -309,6 +309,7 @@ void ForEachRoom(void (*CallBack) (struct quickroom *EachRoom, void *out_data), struct quickroom qrbuf; struct cdbdata *cdbqr; + cdb_begin_transaction(); cdb_rewind(CDB_QUICKROOM); while (cdbqr = cdb_next_item(CDB_QUICKROOM), cdbqr != NULL) { @@ -321,6 +322,7 @@ void ForEachRoom(void (*CallBack) (struct quickroom *EachRoom, void *out_data), if (qrbuf.QRflags & QR_INUSE) (*CallBack)(&qrbuf, in_data); } + cdb_end_transaction(); } @@ -791,6 +793,7 @@ void cmd_whok(void) struct usersupp temp; struct cdbdata *cdbus; + cdb_begin_transaction(); getuser(&CC->usersupp, CC->curr_user); if (CtdlAccessCheck(ac_room_aide)) return; @@ -806,6 +809,7 @@ void cmd_whok(void) ) cprintf("%s\n", temp.fullname); } + cdb_end_transaction(); cprintf("000\n"); } diff --git a/citadel/serv_expire.c b/citadel/serv_expire.c index 2019d5861..7ebb33e65 100644 --- a/citadel/serv_expire.c +++ b/citadel/serv_expire.c @@ -436,6 +436,7 @@ int PurgeVisits(void) { ForEachUser(AddValidUser, NULL); /* Now traverse through the visits, purging irrelevant records... */ + cdb_begin_transaction(); cdb_rewind(CDB_VISIT); while(cdbvisit = cdb_next_item(CDB_VISIT), cdbvisit != NULL) { memset(&vbuf, 0, sizeof(struct visit)); @@ -473,6 +474,8 @@ int PurgeVisits(void) { } + cdb_end_transaction(); + /* Free the valid room/gen combination list */ while (ValidRoomList != NULL) { vrptr = ValidRoomList->next; diff --git a/citadel/serv_upgrade.c b/citadel/serv_upgrade.c index 176194cd9..51d0e430f 100644 --- a/citadel/serv_upgrade.c +++ b/citadel/serv_upgrade.c @@ -49,6 +49,7 @@ void do_pre555_usersupp_upgrade(void) { strcpy(tempfilename, tmpnam(NULL)); /* First, back out all old version records to a flat file */ + cdb_begin_transaction(); cdb_rewind(CDB_USERSUPP); while(cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) { memset(&usbuf, 0, sizeof(struct pre555usersupp)); @@ -58,6 +59,7 @@ void do_pre555_usersupp_upgrade(void) { cdb_free(cdbus); fwrite(&usbuf, sizeof(struct pre555usersupp), 1, fp); } + cdb_end_transaction(); /* ...and overwrite the records with new format records */ rewind(fp); diff --git a/citadel/serv_vandelay.c b/citadel/serv_vandelay.c index 3534a1836..d1cbd2db8 100644 --- a/citadel/serv_vandelay.c +++ b/citadel/serv_vandelay.c @@ -144,6 +144,7 @@ void artv_export_visits(void) { struct visit vbuf; struct cdbdata *cdbv; + cdb_begin_transaction(); cdb_rewind(CDB_VISIT); while (cdbv = cdb_next_item(CDB_VISIT), cdbv != NULL) { @@ -160,6 +161,7 @@ void artv_export_visits(void) { cprintf("%ld\n", vbuf.v_lastseen); cprintf("%u\n", vbuf.v_flags); } + cdb_end_transaction(); } diff --git a/citadel/user_ops.c b/citadel/user_ops.c index cdff7ca70..4350f630f 100644 --- a/citadel/user_ops.c +++ b/citadel/user_ops.c @@ -265,6 +265,7 @@ int getuserbynumber(struct usersupp *usbuf, long int number) { struct cdbdata *cdbus; + cdb_begin_transaction(); cdb_rewind(CDB_USERSUPP); while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) { @@ -274,9 +275,11 @@ int getuserbynumber(struct usersupp *usbuf, long int number) sizeof(struct usersupp) : cdbus->len)); cdb_free(cdbus); if (usbuf->usernum == number) { + cdb_end_transaction(); return (0); } } + cdb_end_transaction(); return (-1); } @@ -940,6 +943,7 @@ void cmd_gnur(void) /* There are unvalidated users. Traverse the usersupp database, * and return the first user we find that needs validation. */ + cdb_begin_transaction(); cdb_rewind(CDB_USERSUPP); while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) { memset(&usbuf, 0, sizeof(struct usersupp)); @@ -950,9 +954,11 @@ void cmd_gnur(void) if ((usbuf.flags & US_NEEDVALID) && (usbuf.axlevel > 0)) { cprintf("%d %s\n", MORE_DATA, usbuf.fullname); + cdb_end_transaction(); return; } } + cdb_end_transaction(); /* If we get to this point, there are no more unvalidated users. * Therefore we clear the "users need validation" flag. @@ -1016,6 +1022,7 @@ void ForEachUser(void (*CallBack) (struct usersupp * EachUser, void *out_data), struct usersupp usbuf; struct cdbdata *cdbus; + cdb_begin_transaction(); cdb_rewind(CDB_USERSUPP); while (cdbus = cdb_next_item(CDB_USERSUPP), cdbus != NULL) { @@ -1026,6 +1033,7 @@ void ForEachUser(void (*CallBack) (struct usersupp * EachUser, void *out_data), cdb_free(cdbus); (*CallBack) (&usbuf, in_data); } + cdb_end_transaction(); } -- 2.30.2