Building better abstractions for the backend interface
[citadel.git] / citadel / server / backends / berkeley_db / berkeley_db.c
1 // This is a data store backend for the Citadel server which uses Berkeley DB.
2 //
3 // Copyright (c) 1987-2023 by the citadel.org team
4 //
5 // This program is open source software.  Use, duplication, or disclosure
6 // is subject to the terms of the GNU General Public License, version 3.
7
8 // Citadel will checkpoint the db at the end of every session, but only if
9 // the specified number of kilobytes has been written, or if the specified
10 // number of minutes has passed, since the last checkpoint.
11 #define MAX_CHECKPOINT_KBYTES   256
12 #define MAX_CHECKPOINT_MINUTES  15
13
14 #include "../../sysdep.h"
15 #include <stdlib.h>
16 #include <unistd.h>
17 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <dirent.h>
20 #include <zlib.h>
21 #include <db.h>
22
23 #if DB_VERSION_MAJOR < 18
24 #error Citadel requires Berkeley DB v18.0 or newer.  Please upgrade.
25 #endif
26
27 #include <libcitadel.h>
28 #include "../../ctdl_module.h"
29 #include "../../control.h"
30 #include "../../citserver.h"
31 #include "../../config.h"
32 #include "berkeley_db.h"
33
34 void                    (*cdb_open_databases)(void)                             = NULL;
35 void                    (*cdb_close_databases)(void)                            = NULL;
36 int                     (*cdb_store)(int, const void *, int, void *, int)       = NULL;
37 int                     (*cdb_delete)(int, void *, int)                         = NULL;
38 void                    (*cdb_free)(struct cdbdata *)                           = NULL;
39 struct cdbdata *        (*cdb_next_item)(int)                                   = NULL;
40 void                    (*cdb_close_cursor)(int)                                = NULL;
41 void                    (*cdb_begin_transaction)(void)                          = NULL;
42 void                    (*cdb_end_transaction)(void)                            = NULL;
43 void                    (*cdb_check_handles)(void)                              = NULL;
44 void                    (*cdb_trunc)(int)                                       = NULL;
45 void                    (*cdb_chmod_data)(void)                                 = NULL;
46 void                    (*check_handles)(void *)                                = NULL;
47 void                    (*cdb_compact)(void)                                    = NULL;
48 void                    (*cdb_checkpoint)(void)                                 = NULL;
49 void                    (*cdb_rewind)(int)                                      = NULL;
50 struct cdbdata *        (*cdb_fetch)(int, const void *, int)                    = NULL;
51
52 static DB *dbp[MAXCDB];         // One DB handle for each Citadel database
53 static DB_ENV *dbenv;           // The DB environment (global)
54
55
56 void bdb_abort(void) {
57         syslog(LOG_DEBUG, "bdb: citserver is stopping in order to prevent data loss. uid=%d gid=%d euid=%d egid=%d",
58                 getuid(), getgid(), geteuid(), getegid()
59         );
60         raise(SIGABRT);         // This will exit in a way that can produce a core dump if needed.
61         exit(CTDLEXIT_DB);      // Exit if the signal failed to end the program.
62 }
63
64
65 // Verbose logging callback
66 void bdb_verbose_log(const DB_ENV *dbenv, const char *msg, const char *foo) {
67         if (!IsEmptyStr(msg)) {
68                 syslog(LOG_DEBUG, "bdb: %s %s", msg, foo);
69         }
70 }
71
72
73 // Verbose logging callback
74 void bdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg) {
75         syslog(LOG_ERR, "bdb: %s", msg);
76 }
77
78
79 // wrapper for txn_abort() that logs/aborts on error
80 static void txabort(DB_TXN *tid) {
81         int ret;
82
83         ret = tid->abort(tid);
84
85         if (ret) {
86                 syslog(LOG_ERR, "bdb: txn_abort: %s", db_strerror(ret));
87                 bdb_abort();
88         }
89 }
90
91
92 // wrapper for txn_commit() that logs/aborts on error
93 static void txcommit(DB_TXN *tid) {
94         int ret;
95
96         ret = tid->commit(tid, 0);
97
98         if (ret) {
99                 syslog(LOG_ERR, "bdb: txn_commit: %s", db_strerror(ret));
100                 bdb_abort();
101         }
102 }
103
104
105 // wrapper for txn_begin() that logs/aborts on error
106 static void txbegin(DB_TXN **tid) {
107         int ret;
108
109         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
110
111         if (ret) {
112                 syslog(LOG_ERR, "bdb: txn_begin: %s", db_strerror(ret));
113                 bdb_abort();
114         }
115 }
116
117
118 // panic callback
119 static void dbpanic(DB_ENV *env, int errval) {
120         syslog(LOG_ERR, "bdb: PANIC: %s", db_strerror(errval));
121         bdb_abort();
122 }
123
124
125 static void cclose(DBC *cursor) {
126         int ret;
127
128         if ((ret = cursor->c_close(cursor))) {
129                 syslog(LOG_ERR, "bdb: c_close: %s", db_strerror(ret));
130                 bdb_abort();
131         }
132 }
133
134
135 static void bailIfCursor(DBC **cursors, const char *msg) {
136         int i;
137
138         for (i = 0; i < MAXCDB; i++)
139                 if (cursors[i] != NULL) {
140                         syslog(LOG_ERR, "bdb: cursor still in progress on cdb %02x: %s", i, msg);
141                         bdb_abort();
142                 }
143 }
144
145
146 void bdb_check_handles(void) {
147         bailIfCursor(TSD->cursors, "in check_handles");
148
149         if (TSD->tid != NULL) {
150                 syslog(LOG_ERR, "bdb: transaction still in progress!");
151                 bdb_abort();
152         }
153 }
154
155
156 // Request a checkpoint of the database.  Called once per minute by the thread manager.
157 void bdb_checkpoint(void) {
158         int ret;
159
160         syslog(LOG_DEBUG, "bdb: -- checkpoint --");
161         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
162
163         if (ret != 0) {
164                 syslog(LOG_ERR, "bdb: bdb_checkpoint() txn_checkpoint: %s", db_strerror(ret));
165                 bdb_abort();
166         }
167
168         // After a successful checkpoint, we can cull the unused logs
169         if (CtdlGetConfigInt("c_auto_cull")) {
170                 ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 1);
171         }
172         else {
173                 ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 0);
174         }
175 }
176
177
178 // Open the various databases we'll be using.  Any database which
179 // does not exist should be created.  Note that we don't need a
180 // critical section here, because there aren't any active threads
181 // manipulating the database yet.
182 void bdb_open_databases(void) {
183         int ret;
184         int i;
185         char dbfilename[32];
186         u_int32_t flags = 0;
187         int dbversion_major, dbversion_minor, dbversion_patch;
188
189         syslog(LOG_DEBUG, "bdb: bdb_open_databases() starting");
190         syslog(LOG_DEBUG, "bdb:    Linked zlib: %s", zlibVersion());
191         syslog(LOG_DEBUG, "bdb: Compiled libdb: %s", DB_VERSION_STRING);
192         syslog(LOG_DEBUG, "bdb:   Linked libdb: %s", db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
193
194         // Create synthetic integer version numbers and compare them.
195         // Never allow citserver to run with a libdb older then the one with which it was compiled.
196         int compiled_db_version = ( (DB_VERSION_MAJOR * 1000000) + (DB_VERSION_MINOR * 1000) + (DB_VERSION_PATCH) );
197         int linked_db_version = ( (dbversion_major * 1000000) + (dbversion_minor * 1000) + (dbversion_patch) );
198         if (compiled_db_version > linked_db_version) {
199                 syslog(LOG_ERR, "bdb: citserver is running with a version of libdb older than the one with which it was compiled.");
200                 syslog(LOG_ERR, "bdb: This is an invalid configuration.  citserver will now exit to prevent data loss.");
201                 exit(CTDLEXIT_DB);
202         }
203
204         // Silently try to create the database subdirectory.  If it's already there, no problem.
205         if ((mkdir(ctdl_db_dir, 0700) != 0) && (errno != EEXIST)) {
206                 syslog(LOG_ERR, "bdb: database directory [%s] does not exist and could not be created: %m", ctdl_db_dir);
207                 exit(CTDLEXIT_DB);
208         }
209         if (chmod(ctdl_db_dir, 0700) != 0) {
210                 syslog(LOG_ERR, "bdb: unable to set database directory permissions [%s]: %m", ctdl_db_dir);
211                 exit(CTDLEXIT_DB);
212         }
213         if (chown(ctdl_db_dir, CTDLUID, (-1)) != 0) {
214                 syslog(LOG_ERR, "bdb: unable to set the owner for [%s]: %m", ctdl_db_dir);
215                 exit(CTDLEXIT_DB);
216         }
217         syslog(LOG_DEBUG, "bdb: Setting up DB environment");
218         ret = db_env_create(&dbenv, 0);
219         if (ret) {
220                 syslog(LOG_ERR, "bdb: db_env_create: %s", db_strerror(ret));
221                 syslog(LOG_ERR, "bdb: exit code %d", ret);
222                 exit(CTDLEXIT_DB);
223         }
224         dbenv->set_errpfx(dbenv, "citserver");
225         dbenv->set_paniccall(dbenv, dbpanic);
226         dbenv->set_errcall(dbenv, bdb_verbose_err);
227         dbenv->set_msgcall(dbenv, bdb_verbose_log);
228         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
229         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
230
231         // We want to specify the shared memory buffer pool cachesize, but everything else is the default.
232         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
233         if (ret) {
234                 syslog(LOG_ERR, "bdb: set_cachesize: %s", db_strerror(ret));
235                 dbenv->close(dbenv, 0);
236                 syslog(LOG_ERR, "bdb: exit code %d", ret);
237                 exit(CTDLEXIT_DB);
238         }
239
240         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
241                 syslog(LOG_ERR, "bdb: set_lk_detect: %s", db_strerror(ret));
242                 dbenv->close(dbenv, 0);
243                 syslog(LOG_ERR, "bdb: exit code %d", ret);
244                 exit(CTDLEXIT_DB);
245         }
246
247         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_INIT_LOG;
248         syslog(LOG_DEBUG, "bdb: dbenv->open(dbenv, %s, %d, 0)", ctdl_db_dir, flags);
249         ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                                // try opening the database cleanly
250         if (ret == DB_RUNRECOVERY) {
251                 syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
252                 syslog(LOG_ERR, "bdb: attempting recovery...");
253                 flags |= DB_RECOVER;
254                 ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try recovery
255         }
256         if (ret == DB_RUNRECOVERY) {
257                 syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
258                 syslog(LOG_ERR, "bdb: attempting catastrophic recovery...");
259                 flags &= ~DB_RECOVER;
260                 flags |= DB_RECOVER_FATAL;
261                 ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try catastrophic recovery
262         }
263         if (ret) {
264                 syslog(LOG_ERR, "bdb: dbenv->open: %s", db_strerror(ret));
265                 dbenv->close(dbenv, 0);
266                 syslog(LOG_ERR, "bdb: exit code %d", ret);
267                 exit(CTDLEXIT_DB);
268         }
269
270         syslog(LOG_INFO, "bdb: mounting databases");
271         for (i = 0; i < MAXCDB; ++i) {
272                 ret = db_create(&dbp[i], dbenv, 0);                                     // Create a database handle
273                 if (ret) {
274                         syslog(LOG_ERR, "bdb: db_create: %s", db_strerror(ret));
275                         syslog(LOG_ERR, "bdb: exit code %d", ret);
276                         exit(CTDLEXIT_DB);
277                 }
278
279                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);                 // table names by number
280                 ret = dbp[i]->open(dbp[i], NULL, dbfilename, NULL, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0600);
281                 if (ret) {
282                         syslog(LOG_ERR, "bdb: db_open[%02x]: %s", i, db_strerror(ret));
283                         if (ret == ENOMEM) {
284                                 syslog(LOG_ERR, "bdb: You may need to tune your database; please check http://www.citadel.org for more information.");
285                         }
286                         syslog(LOG_ERR, "bdb: exit code %d", ret);
287                         exit(CTDLEXIT_DB);
288                 }
289         }
290 }
291
292
293 // Make sure we own all the files, because in a few milliseconds we're going to drop root privs.
294 void bdb_chmod_data(void) {
295         DIR *dp;
296         struct dirent *d;
297         char filename[PATH_MAX];
298
299         dp = opendir(ctdl_db_dir);
300         if (dp != NULL) {
301                 while (d = readdir(dp), d != NULL) {
302                         if (d->d_name[0] != '.') {
303                                 snprintf(filename, sizeof filename, "%s/%s", ctdl_db_dir, d->d_name);
304                                 syslog(LOG_DEBUG, "bdb: chmod(%s, 0600) returned %d", filename, chmod(filename, 0600));
305                                 syslog(LOG_DEBUG, "bdb: chown(%s, CTDLUID, -1) returned %d", filename, chown(filename, CTDLUID, (-1)));
306                         }
307                 }
308                 closedir(dp);
309         }
310 }
311
312
313 // Close all of the db database files we've opened.  This can be done in a loop, since it's just a bunch of closes.
314 void bdb_close_databases(void) {
315         int i;
316         int ret;
317
318         static int closing = 0;
319         while (closing == 1) {
320                 syslog(LOG_INFO, "bdb: already closing");
321         }
322         closing = 1;
323
324         syslog(LOG_INFO, "bdb: performing final checkpoint");
325         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
326                 syslog(LOG_ERR, "bdb: txn_checkpoint: %s", db_strerror(ret));
327         }
328
329         syslog(LOG_INFO, "bdb: flushing the database logs");
330         if ((ret = dbenv->log_flush(dbenv, NULL))) {
331                 syslog(LOG_ERR, "bdb: log_flush: %s", db_strerror(ret));
332         }
333
334         // close the tables
335         syslog(LOG_INFO, "bdb: closing databases");
336         for (i = 0; i < MAXCDB; ++i) {
337                 syslog(LOG_INFO, "bdb: closing database %02x", i);
338                 ret = dbp[i]->close(dbp[i], 0);
339                 if (ret) {
340                         syslog(LOG_ERR, "bdb: db_close: %s", db_strerror(ret));
341                 }
342         }
343
344         // Close the handle.
345         ret = dbenv->close(dbenv, DB_FORCESYNC);
346         if (ret) {
347                 syslog(LOG_ERR, "bdb: DBENV->close: %s", db_strerror(ret));
348         }
349 }
350
351
352 // Decompress a database item if it was compressed on disk
353 void bdb_decompress_if_necessary(struct cdbdata *cdb) {
354         static int magic = COMPRESS_MAGIC;
355
356         if ((cdb == NULL) || (cdb->ptr == NULL) || (cdb->len < sizeof(magic)) || (memcmp(cdb->ptr, &magic, sizeof(magic)))) {
357                 return;
358         }
359
360         // At this point we know we're looking at a compressed item.
361
362         struct CtdlCompressHeader zheader;
363         char *uncompressed_data;
364         char *compressed_data;
365         uLongf destLen, sourceLen;
366         size_t cplen;
367
368         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
369         cplen = sizeof(struct CtdlCompressHeader);
370         if (sizeof(struct CtdlCompressHeader) > cdb->len) {
371                 cplen = cdb->len;
372         }
373         memcpy(&zheader, cdb->ptr, cplen);
374
375         compressed_data = cdb->ptr;
376         compressed_data += sizeof(struct CtdlCompressHeader);
377
378         sourceLen = (uLongf) zheader.compressed_len;
379         destLen = (uLongf) zheader.uncompressed_len;
380         uncompressed_data = malloc(zheader.uncompressed_len);
381
382         if (uncompress((Bytef *) uncompressed_data,
383                        (uLongf *) &destLen, (const Bytef *) compressed_data, (uLong) sourceLen) != Z_OK) {
384                 syslog(LOG_ERR, "bdb: uncompress() error");
385                 bdb_abort();
386         }
387
388         free(cdb->ptr);
389         cdb->len = (size_t) destLen;
390         cdb->ptr = uncompressed_data;
391 }
392
393
394 // Store a piece of data.  Returns 0 if the operation was successful.  If a
395 // key already exists it should be overwritten.
396 int bdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen) {
397
398         DBT dkey, ddata;
399         DB_TXN *tid = NULL;
400         int ret = 0;
401         struct CtdlCompressHeader zheader;
402         char *compressed_data = NULL;
403         int compressing = 0;
404         size_t buffer_len = 0;
405         uLongf destLen = 0;
406
407         memset(&dkey, 0, sizeof(DBT));
408         memset(&ddata, 0, sizeof(DBT));
409         dkey.size = ckeylen;
410         dkey.data = (void *) ckey;
411         ddata.size = cdatalen;
412         ddata.data = cdata;
413
414         // "visit" records are numerous and have big, mostly-empty string buffers in them.
415         // If we compress these we can get them down to 1% of their size most of the time.
416         if (cdb == CDB_VISIT) {
417                 compressing = 1;
418                 zheader.magic = COMPRESS_MAGIC;
419                 zheader.uncompressed_len = cdatalen;
420                 buffer_len = ((cdatalen * 101) / 100) + 100 + sizeof(struct CtdlCompressHeader);
421                 destLen = (uLongf) buffer_len;
422                 compressed_data = malloc(buffer_len);
423                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)), &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK) {
424                         syslog(LOG_ERR, "bdb: compress2() error");
425                         bdb_abort();
426                 }
427                 zheader.compressed_len = (size_t) destLen;
428                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
429                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
430                 ddata.data = compressed_data;
431         }
432
433         if (TSD->tid != NULL) {
434                 ret = dbp[cdb]->put(dbp[cdb],   // db
435                                     TSD->tid,   // transaction ID
436                                     &dkey,      // key
437                                     &ddata,     // data
438                                     0           // flags
439                 );
440                 if (ret) {
441                         syslog(LOG_ERR, "bdb: bdb_store(%d): %s", cdb, db_strerror(ret));
442                         bdb_abort();
443                 }
444                 if (compressing) {
445                         free(compressed_data);
446                 }
447                 return ret;
448         }
449         else {
450                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
451
452               retry:
453                 txbegin(&tid);
454
455                 if ((ret = dbp[cdb]->put(dbp[cdb],      // db
456                                          tid,           // transaction ID
457                                          &dkey,         // key
458                                          &ddata,        // data
459                                          0))) {         // flags
460                         if (ret == DB_LOCK_DEADLOCK) {
461                                 txabort(tid);
462                                 goto retry;
463                         }
464                         else {
465                                 syslog(LOG_ERR, "bdb: bdb_store(%d): %s", cdb, db_strerror(ret));
466                                 bdb_abort();
467                         }
468                 }
469                 else {
470                         txcommit(tid);
471                         if (compressing) {
472                                 free(compressed_data);
473                         }
474                         return ret;
475                 }
476         }
477         return ret;
478 }
479
480
481 // Delete a piece of data.  Returns 0 if the operation was successful.
482 int bdb_delete(int cdb, void *key, int keylen) {
483         DBT dkey;
484         DB_TXN *tid;
485         int ret;
486
487         memset(&dkey, 0, sizeof dkey);
488         dkey.size = keylen;
489         dkey.data = key;
490
491         if (TSD->tid != NULL) {
492                 ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0);
493                 if (ret) {
494                         syslog(LOG_ERR, "bdb: bdb_delete(%d): %s", cdb, db_strerror(ret));
495                         if (ret != DB_NOTFOUND) {
496                                 bdb_abort();
497                         }
498                 }
499         }
500         else {
501                 bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor");
502
503               retry:
504                 txbegin(&tid);
505
506                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0)) && ret != DB_NOTFOUND) {
507                         if (ret == DB_LOCK_DEADLOCK) {
508                                 txabort(tid);
509                                 goto retry;
510                         }
511                         else {
512                                 syslog(LOG_ERR, "bdb: bdb_delete(%d): %s", cdb, db_strerror(ret));
513                                 bdb_abort();
514                         }
515                 }
516                 else {
517                         txcommit(tid);
518                 }
519         }
520         return ret;
521 }
522
523
524 static DBC *localcursor(int cdb) {
525         int ret;
526         DBC *curs;
527
528         if (TSD->cursors[cdb] == NULL) {
529                 ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0);
530         }
531         else {
532                 ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
533         }
534
535         if (ret) {
536                 syslog(LOG_ERR, "bdb: localcursor: %s", db_strerror(ret));
537                 bdb_abort();
538         }
539
540         return curs;
541 }
542
543
544 // Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
545 // a struct cdbdata which it is the caller's responsibility to free later on
546 // using the bdb_free() routine.
547 struct cdbdata *bdb_fetch(int cdb, const void *key, int keylen) {
548
549         if (keylen == 0) {              // key length zero is impossible
550                 return(NULL);
551         }
552
553         struct cdbdata *tempcdb;
554         DBT dkey, dret;
555         int ret;
556
557         memset(&dkey, 0, sizeof(DBT));
558         dkey.size = keylen;
559         dkey.data = (void *) key;
560
561         if (TSD->tid != NULL) {
562                 memset(&dret, 0, sizeof(DBT));
563                 dret.flags = DB_DBT_MALLOC;
564                 ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0);
565         }
566         else {
567                 DBC *curs;
568
569                 do {
570                         memset(&dret, 0, sizeof(DBT));
571                         dret.flags = DB_DBT_MALLOC;
572                         curs = localcursor(cdb);
573                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
574                         cclose(curs);
575                 } while (ret == DB_LOCK_DEADLOCK);
576         }
577
578         if ((ret != 0) && (ret != DB_NOTFOUND)) {
579                 syslog(LOG_ERR, "bdb: bdb_fetch(%d): %s", cdb, db_strerror(ret));
580                 bdb_abort();
581         }
582
583         if (ret != 0) {
584                 return NULL;
585         }
586
587         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
588         if (tempcdb == NULL) {
589                 syslog(LOG_ERR, "bdb: bdb_fetch() cannot allocate memory for tempcdb: %m");
590                 bdb_abort();
591         }
592         else {
593                 tempcdb->len = dret.size;
594                 tempcdb->ptr = dret.data;
595                 bdb_decompress_if_necessary(tempcdb);
596                 return (tempcdb);
597         }
598 }
599
600
601 // Free a cdbdata item.
602 //
603 // Note that we only free the 'ptr' portion if it is not NULL.  This allows
604 // other code to assume ownership of that memory simply by storing the
605 // pointer elsewhere and then setting 'ptr' to NULL.  bdb_free() will then
606 // avoid freeing it.
607 void bdb_free(struct cdbdata *cdb) {
608         if (cdb->ptr) {
609                 free(cdb->ptr);
610         }
611         free(cdb);
612 }
613
614
615 void bdb_close_cursor(int cdb) {
616         if (TSD->cursors[cdb] != NULL) {
617                 cclose(TSD->cursors[cdb]);
618         }
619
620         TSD->cursors[cdb] = NULL;
621 }
622
623
624 // Prepare for a sequential search of an entire database.
625 // (There is guaranteed to be no more than one traversal in
626 // progress per thread at any given time.)
627 void bdb_rewind(int cdb) {
628         int ret = 0;
629
630         if (TSD->cursors[cdb] != NULL) {
631                 syslog(LOG_ERR, "bdb: bdb_rewind: must close cursor on database %d before reopening", cdb);
632                 bdb_abort();
633                 // cclose(TSD->cursors[cdb]);
634         }
635
636         // Now initialize the cursor
637         ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0);
638         if (ret) {
639                 syslog(LOG_ERR, "bdb: bdb_rewind: db_cursor: %s", db_strerror(ret));
640                 bdb_abort();
641         }
642 }
643
644
645 // Fetch the next item in a sequential search.  Returns a pointer to a 
646 // cdbdata structure, or NULL if we've hit the end.
647 struct cdbdata *bdb_next_item(int cdb) {
648         DBT key, data;
649         struct cdbdata *cdbret;
650         int ret = 0;
651
652         // Initialize the key/data pair so the flags aren't set.
653         memset(&key, 0, sizeof(key));
654         memset(&data, 0, sizeof(data));
655         data.flags = DB_DBT_MALLOC;
656
657         ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT);
658
659         if (ret) {
660                 if (ret != DB_NOTFOUND) {
661                         syslog(LOG_ERR, "bdb: bdb_next_item(%d): %s", cdb, db_strerror(ret));
662                         bdb_abort();
663                 }
664                 bdb_close_cursor(cdb);
665                 return NULL;    // presumably, end of file
666         }
667
668         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
669         cdbret->len = data.size;
670         cdbret->ptr = data.data;
671         bdb_decompress_if_necessary(cdbret);
672
673         return (cdbret);
674 }
675
676
677 // Transaction-based stuff.  I'm writing this as I bake cookies...
678 void bdb_begin_transaction(void) {
679         bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
680
681         if (TSD->tid != NULL) {
682                 syslog(LOG_ERR, "bdb: bdb_begin_transaction: ERROR: nested transaction");
683                 bdb_abort();
684         }
685
686         txbegin(&TSD->tid);
687 }
688
689
690 void bdb_end_transaction(void) {
691         int i;
692
693         for (i = 0; i < MAXCDB; i++) {
694                 if (TSD->cursors[i] != NULL) {
695                         syslog(LOG_WARNING, "bdb: bdb_end_transaction: WARNING: cursor %d still open at transaction end", i);
696                         cclose(TSD->cursors[i]);
697                         TSD->cursors[i] = NULL;
698                 }
699         }
700
701         if (TSD->tid == NULL) {
702                 syslog(LOG_ERR, "bdb: bdb_end_transaction: ERROR: txcommit(NULL) !!");
703                 bdb_abort();
704         }
705         else {
706                 txcommit(TSD->tid);
707         }
708
709         TSD->tid = NULL;
710 }
711
712
713 // Truncate (delete every record)
714 void bdb_trunc(int cdb) {
715         int ret;
716         u_int32_t count;
717
718         if (TSD->tid != NULL) {
719                 syslog(LOG_ERR, "bdb: bdb_trunc must not be called in a transaction.");
720                 bdb_abort();
721         }
722         else {
723                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
724
725               retry:
726
727                 if ((ret = dbp[cdb]->truncate(dbp[cdb], // db
728                                               NULL,     // transaction ID
729                                               &count,   // #rows deleted
730                                               0))) {    // flags
731                         if (ret == DB_LOCK_DEADLOCK) {
732                                 goto retry;
733                         }
734                         else {
735                                 syslog(LOG_ERR, "bdb: bdb_truncate(%d): %s", cdb, db_strerror(ret));
736                                 if (ret == ENOMEM) {
737                                         syslog(LOG_ERR, "bdb: You may need to tune your database; please read http://www.citadel.org for more information.");
738                                 }
739                                 exit(CTDLEXIT_DB);
740                         }
741                 }
742         }
743 }
744
745
746 // compact (defragment) the database, possibly returning space back to the underlying filesystem
747 void bdb_compact(void) {
748         int ret;
749         int i;
750
751         syslog(LOG_DEBUG, "bdb: bdb_compact() started");
752         for (i = 0; i < MAXCDB; i++) {
753                 syslog(LOG_DEBUG, "bdb: compacting database %d", i);
754                 ret = dbp[i]->compact(dbp[i], NULL, NULL, NULL, NULL, DB_FREE_SPACE, NULL);
755                 if (ret) {
756                         syslog(LOG_ERR, "bdb: compact: %s", db_strerror(ret));
757                 }
758         }
759         syslog(LOG_DEBUG, "bdb: bdb_compact() finished");
760 }
761
762
763 // Calling this function activates the Berkeley DB back end.
764 void bdb_init_backend(void) {
765         cdb_compact = bdb_compact;
766         cdb_checkpoint = bdb_checkpoint;
767         cdb_rewind = bdb_rewind;
768         cdb_fetch = bdb_fetch;
769         cdb_open_databases = bdb_open_databases;
770         cdb_close_databases = bdb_close_databases;
771         cdb_store = bdb_store;
772         cdb_delete = bdb_delete;
773         cdb_free = bdb_free;
774         cdb_next_item = bdb_next_item;
775         cdb_close_cursor = bdb_close_cursor;
776         cdb_begin_transaction = bdb_begin_transaction;
777         cdb_end_transaction = bdb_end_transaction;
778         cdb_check_handles = bdb_check_handles;
779         cdb_trunc = bdb_trunc;
780         cdb_chmod_data = bdb_chmod_data;
781
782         syslog(LOG_INFO, "db: initialized Berkeley DB backend");
783 }
784
785