]> code.citadel.org Git - citadel.git/blobdiff - citadel/server/backends/berkeley_db/berkeley_db.c
berkeley_db.c: improvements to transactional data store
[citadel.git] / citadel / server / backends / berkeley_db / berkeley_db.c
index 147446f6e0038452b74e885d537ed53bbdd0392f..4d77a0b1483b33aba78e8a21ce7aa1a6a15b9be6 100644 (file)
@@ -112,19 +112,6 @@ static void bdb_txcommit(DB_TXN *tid) {
 }
 
 
-// Wrapper for txn_begin() that logs/aborts on error.  Not part of the backend API.
-static void bdb_txbegin(DB_TXN **tid) {
-       int ret;
-
-       ret = bdb_env->txn_begin(bdb_env, NULL, tid, 0);
-
-       if (ret) {
-               syslog(LOG_ERR, "bdb: txn_begin: %s", db_strerror(ret));
-               bdb_abort();
-       }
-}
-
-
 // Panic callback for Berkeley DB.  Not part of the backend API.
 static void bdb_dbpanic(DB_ENV *env, int errval) {
        syslog(LOG_ERR, "bdb: PANIC: %s", db_strerror(errval));
@@ -166,6 +153,48 @@ void bdb_check_handles(void) {
 }
 
 
+// Transaction-based stuff.  I'm writing this as I bake cookies...
+void bdb_begin_transaction(void) {
+       int ret;
+       bdb_bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
+
+       if (TSD->tid != NULL) {
+               syslog(LOG_ERR, "bdb: bdb_begin_transaction: ERROR: nested transaction");
+               bdb_abort();
+       }
+
+       ret = bdb_env->txn_begin(bdb_env, NULL, &TSD->tid, 0);
+       if (ret) {
+               syslog(LOG_ERR, "bdb: bdb_begin_transaction: %s", db_strerror(ret));
+               bdb_abort();
+       }
+}
+
+
+// ...and the cookies are cursed.
+void bdb_end_transaction(void) {
+       int i;
+
+       for (i = 0; i < MAXCDB; i++) {
+               if (TSD->cursors[i] != NULL) {
+                       syslog(LOG_WARNING, "bdb: bdb_end_transaction: WARNING: cursor %d still open at transaction end", i);
+                       bdb_cclose(TSD->cursors[i]);
+                       TSD->cursors[i] = NULL;
+               }
+       }
+
+       if (TSD->tid == NULL) {
+               syslog(LOG_ERR, "bdb: bdb_end_transaction: ERROR: bdb_txcommit(NULL) !!");
+               bdb_abort();
+       }
+       else {
+               bdb_txcommit(TSD->tid);
+       }
+
+       TSD->tid = NULL;
+}
+
+
 // Request a checkpoint of the database.  Called once per minute by the thread manager.
 void bdb_checkpoint(void) {
        int ret;
@@ -226,42 +255,11 @@ void bdb_open_databases(void) {
        bdb_env->set_verbose(bdb_env, DB_VERB_DEADLOCK, 1);
        bdb_env->set_verbose(bdb_env, DB_VERB_RECOVERY, 1);
 
-       // We want to specify the shared memory buffer pool cachesize, but everything else is the default.
-       // 2023aug21 ajc: the third argument is zero, so this never did anything
-       // ret = bdb_env->set_cachesize(bdb_env, 0, 64 * 1024, 0);
-       // if (ret) {
-               // syslog(LOG_ERR, "bdb: set_cachesize: %s", db_strerror(ret));
-               // bdb_env->close(bdb_env, 0);
-               // syslog(LOG_ERR, "bdb: exit code %d", ret);
-               // exit(CTDLEXIT_DB);
-       // }
-
-       // This appears to do nothing over and above the default
-       //if ((ret = bdb_env->set_lk_detect(bdb_env, DB_LOCK_DEFAULT))) {
-               //syslog(LOG_ERR, "bdb: set_lk_detect: %s", db_strerror(ret));
-               //bdb_env->close(bdb_env, 0);
-               //syslog(LOG_ERR, "bdb: exit code %d", ret);
-               //exit(CTDLEXIT_DB);
-       //}
-
-       flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | /*DB_INIT_LOCK |*/ DB_THREAD | DB_INIT_LOG;
+       flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD;
        syslog(LOG_DEBUG, "bdb: bdb_env->open(bdb_env, %s, %d, 0)", ctdl_db_dir, flags);
        ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                            // try opening the database cleanly
-       if (ret == DB_RUNRECOVERY) {
-               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
-               syslog(LOG_ERR, "bdb: attempting recovery...");
-               flags |= DB_RECOVER;
-               ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                    // try recovery
-       }
-       if (ret == DB_RUNRECOVERY) {
-               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
-               syslog(LOG_ERR, "bdb: attempting catastrophic recovery...");
-               flags &= ~DB_RECOVER;
-               flags |= DB_RECOVER_FATAL;
-               ret = bdb_env->open(bdb_env, ctdl_db_dir, flags, 0);                    // try catastrophic recovery
-       }
        if (ret) {
-               syslog(LOG_ERR, "bdb: bdb_env->open: %s", db_strerror(ret));
+               syslog(LOG_ERR, "bdb: bdb_env->open: %s: %s", ctdl_db_dir, db_strerror(ret));
                bdb_env->close(bdb_env, 0);
                syslog(LOG_ERR, "bdb: exit code %d", ret);
                exit(CTDLEXIT_DB);
@@ -386,6 +384,7 @@ int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
        int compressing = 0;
        size_t buffer_len = 0;
        uLongf destLen = 0;
+       int existing_txn = 0;                   // set to nonzero if we are already inside a transaction
 
        memset(&dkey, 0, sizeof(DBT));
        memset(&ddata, 0, sizeof(DBT));
@@ -414,49 +413,33 @@ int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
        }
 
        if (TSD->tid != NULL) {
-               ret = bdb_table[cdb]->put(bdb_table[cdb],       // db
-                                   TSD->tid,                   // transaction ID
-                                   &dkey,                      // key
-                                   &ddata,                     // data
-                                   0                           // flags
-               );
-               if (ret) {
-                       syslog(LOG_ERR, "bdb: bdb_store(%d): %s", cdb, db_strerror(ret));
-                       bdb_abort();
-               }
-               if (compressing) {
-                       free(compressed_data);
-               }
-               return ret;
+               existing_txn = 1;
        }
-       else {
-               bdb_bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
-
-             retry:
-               bdb_txbegin(&tid);
-
-               if ((ret = bdb_table[cdb]->put(bdb_table[cdb],  // db
-                                        tid,                   // transaction ID
-                                        &dkey,                 // key
-                                        &ddata,                // data
-                                        0))) {                 // flags
-                       if (ret == DB_LOCK_DEADLOCK) {
-                               bdb_txabort(tid);
-                               goto retry;
-                       }
-                       else {
-                               syslog(LOG_ERR, "bdb: bdb_store(%d): %s", cdb, db_strerror(ret));
-                               bdb_abort();
-                       }
-               }
-               else {
-                       bdb_txcommit(tid);
-                       if (compressing) {
-                               free(compressed_data);
-                       }
-                       return ret;
-               }
+
+       if (!existing_txn) {                            // If we're not already inside a transaction,
+               bdb_begin_transaction();                // create our own for this operation.
+       }
+
+       ret = bdb_table[cdb]->put(bdb_table[cdb],       // db
+                           TSD->tid,                   // transaction ID
+                           &dkey,                      // key
+                           &ddata,                     // data
+                           0                           // flags
+       );
+
+       if (ret) {
+               syslog(LOG_ERR, "bdb: bdb_store(%02x): %s", cdb, db_strerror(ret));
+               bdb_abort();
+       }
+
+       if (!existing_txn) {
+               bdb_end_transaction();
+       }
+
+       if (compressing) {
+               free(compressed_data);
        }
+
        return ret;
 }
 
@@ -464,68 +447,39 @@ int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
 // Delete a piece of data.  Returns 0 if the operation was successful.
 int bdb_delete(int cdb, void *key, int keylen) {
        DBT dkey;
-       DB_TXN *tid;
        int ret;
+       int existing_txn = 0;                           // set to nonzero if we are already inside a transaction
 
        memset(&dkey, 0, sizeof dkey);
        dkey.size = keylen;
        dkey.data = key;
 
        if (TSD->tid != NULL) {
-               ret = bdb_table[cdb]->del(bdb_table[cdb], TSD->tid, &dkey, 0);
-               if (ret) {
-                       syslog(LOG_ERR, "bdb: bdb_delete(%d): %s", cdb, db_strerror(ret));
-                       if (ret != DB_NOTFOUND) {
-                               bdb_abort();
-                       }
-               }
+               existing_txn = 1;
        }
-       else {
-               bdb_bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor");
 
-             retry:
-               bdb_txbegin(&tid);
-
-               if ((ret = bdb_table[cdb]->del(bdb_table[cdb], tid, &dkey, 0)) && ret != DB_NOTFOUND) {
-                       if (ret == DB_LOCK_DEADLOCK) {
-                               bdb_txabort(tid);
-                               goto retry;
-                       }
-                       else {
-                               syslog(LOG_ERR, "bdb: bdb_delete(%d): %s", cdb, db_strerror(ret));
-                               bdb_abort();
-                       }
-               }
-               else {
-                       bdb_txcommit(tid);
-               }
+       if (!existing_txn) {                            // If we're not already inside a transaction,
+               bdb_begin_transaction();                // create our own for this operation.
        }
-       return ret;
-}
 
-
-static DBC *bdb_localcursor(int cdb) {
-       int ret;
-       DBC *curs;
-
-       if (TSD->cursors[cdb] == NULL) {
-               ret = bdb_table[cdb]->cursor(bdb_table[cdb], TSD->tid, &curs, 0);
-       }
-       else {
-               ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
+       ret = bdb_table[cdb]->del(bdb_table[cdb], TSD->tid, &dkey, 0);
+       if (ret) {
+               if (ret != DB_NOTFOUND) {
+                       syslog(LOG_ERR, "bdb: bdb_delete(%02x): %s", cdb, db_strerror(ret));
+                       bdb_abort();
+               }
        }
 
-       if (ret) {
-               syslog(LOG_ERR, "bdb: bdb_localcursor: %s", db_strerror(ret));
-               bdb_abort();
+       if (!existing_txn) {
+               bdb_end_transaction();                  // Only end the transaction if we began it.
        }
 
-       return curs;
+       return ret;
 }
 
 
 // Fetch a piece of data.  Returns a "struct cdbdata"
-// cdbdata.len will be 0 if the item is not found.
+// If the item is not found, the pointer will be NULL.
 struct cdbdata bdb_fetch(int cdb, const void *key, int keylen) {
 
        struct cdbdata returned_data;
@@ -542,20 +496,8 @@ struct cdbdata bdb_fetch(int cdb, const void *key, int keylen) {
        dkey.size = keylen;
        dkey.data = (void *) key;
 
-       if (TSD->tid != NULL) {
-               TSD->dbdata[cdb].flags = DB_DBT_REALLOC;
-               ret = bdb_table[cdb]->get(bdb_table[cdb], TSD->tid, &dkey, &TSD->dbdata[cdb], 0);
-       }
-       else {
-               DBC *curs;
-
-               do {
-                       TSD->dbdata[cdb].flags = DB_DBT_REALLOC;
-                       curs = bdb_localcursor(cdb);
-                       ret = curs->c_get(curs, &dkey, &TSD->dbdata[cdb], DB_SET);
-                       bdb_cclose(curs);
-               } while (ret == DB_LOCK_DEADLOCK);
-       }
+       TSD->dbdata[cdb].flags = DB_DBT_REALLOC;
+       ret = bdb_table[cdb]->get(bdb_table[cdb], TSD->tid, &dkey, &TSD->dbdata[cdb], 0);
 
        if ((ret != 0) && (ret != DB_NOTFOUND)) {
                syslog(LOG_ERR, "bdb: bdb_fetch(%d): %s", cdb, db_strerror(ret));
@@ -635,42 +577,6 @@ struct cdbkeyval bdb_next_item(int cdb) {
 }
 
 
-// Transaction-based stuff.  I'm writing this as I bake cookies...
-void bdb_begin_transaction(void) {
-       bdb_bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
-
-       if (TSD->tid != NULL) {
-               syslog(LOG_ERR, "bdb: bdb_begin_transaction: ERROR: nested transaction");
-               bdb_abort();
-       }
-
-       bdb_txbegin(&TSD->tid);
-}
-
-
-void bdb_end_transaction(void) {
-       int i;
-
-       for (i = 0; i < MAXCDB; i++) {
-               if (TSD->cursors[i] != NULL) {
-                       syslog(LOG_WARNING, "bdb: bdb_end_transaction: WARNING: cursor %d still open at transaction end", i);
-                       bdb_cclose(TSD->cursors[i]);
-                       TSD->cursors[i] = NULL;
-               }
-       }
-
-       if (TSD->tid == NULL) {
-               syslog(LOG_ERR, "bdb: bdb_end_transaction: ERROR: bdb_txcommit(NULL) !!");
-               bdb_abort();
-       }
-       else {
-               bdb_txcommit(TSD->tid);
-       }
-
-       TSD->tid = NULL;
-}
-
-
 // Truncate (delete every record)
 void bdb_trunc(int cdb) {
        int ret;
@@ -680,27 +586,21 @@ void bdb_trunc(int cdb) {
                syslog(LOG_ERR, "bdb: bdb_trunc must not be called in a transaction.");
                bdb_abort();
        }
-       else {
-               bdb_bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
-
-             retry:
 
-               if ((ret = bdb_table[cdb]->truncate(bdb_table[cdb],     // db
-                                             NULL,                     // transaction ID
-                                             &count,                   // #rows deleted
-                                             0))) {                    // flags
-                       if (ret == DB_LOCK_DEADLOCK) {
-                               goto retry;
-                       }
-                       else {
-                               syslog(LOG_ERR, "bdb: bdb_truncate(%d): %s", cdb, db_strerror(ret));
-                               if (ret == ENOMEM) {
-                                       syslog(LOG_ERR, "bdb: You may need to tune your database; please read http://www.citadel.org for more information.");
-                               }
-                               exit(CTDLEXIT_DB);
-                       }
+       bdb_begin_transaction();                                // create our own transaction for this operation.
+       ret = bdb_table[cdb]->truncate(bdb_table[cdb],          // db
+                                     NULL,                     // transaction ID
+                                     &count,                   // #rows deleted
+                                     0);                       // flags
+                                                               //
+       if (ret) {
+               syslog(LOG_ERR, "bdb: bdb_truncate(%d): %s", cdb, db_strerror(ret));
+               if (ret == ENOMEM) {
+                       syslog(LOG_ERR, "bdb: You may need to tune your database; please read http://www.citadel.org for more information.");
                }
+               exit(CTDLEXIT_DB);
        }
+       bdb_end_transaction();
 }