]> code.citadel.org Git - citadel.git/blob - citadel/server/database.c
Exit cleanly if citserver is running with a version of libdb older than the one with...
[citadel.git] / citadel / server / database.c
1 // This is a data store backend for the Citadel server which uses Berkeley DB.
2 //
3 // Copyright (c) 1987-2022 by the citadel.org team
4 //
5 // This program is open source software.  Use, duplication, or disclosure
6 // is subject to the terms of the GNU General Public License, version 3.
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
25 #include <stdio.h>
26 #include <dirent.h>
27 #include <zlib.h>
28 #include <db.h>
29
30 #if DB_VERSION_MAJOR < 5
31 #error Citadel requires Berkeley DB v5.0 or newer.  Please upgrade.
32 #endif
33
34 #include <libcitadel.h>
35 #include "ctdl_module.h"
36 #include "control.h"
37 #include "citserver.h"
38 #include "config.h"
39
40 static DB *dbp[MAXCDB];         // One DB handle for each Citadel database
41 static DB_ENV *dbenv;           // The DB environment (global)
42
43
44 void cdb_abort(void) {
45         syslog(LOG_DEBUG, "db: citserver is stopping in order to prevent data loss. uid=%d gid=%d euid=%d egid=%d",
46                 getuid(), getgid(), geteuid(), getegid()
47         );
48         raise(SIGABRT);         // This will exit in a way that can produce a core dump if needed.
49         exit(CTDLEXIT_DB);      // Exit if the signal failed to end the program.
50 }
51
52
53 // Verbose logging callback
54 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg, const char *what_is_this) {
55         if (!IsEmptyStr(msg)) {
56                 syslog(LOG_DEBUG, "db: %s", msg);
57         }
58 }
59
60
61 // Verbose logging callback
62 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg) {
63         syslog(LOG_ERR, "db: %s", msg);
64 }
65
66
67 // wrapper for txn_abort() that logs/aborts on error
68 static void txabort(DB_TXN *tid) {
69         int ret;
70
71         ret = tid->abort(tid);
72
73         if (ret) {
74                 syslog(LOG_ERR, "db: txn_abort: %s", db_strerror(ret));
75                 cdb_abort();
76         }
77 }
78
79
80 // wrapper for txn_commit() that logs/aborts on error
81 static void txcommit(DB_TXN *tid) {
82         int ret;
83
84         ret = tid->commit(tid, 0);
85
86         if (ret) {
87                 syslog(LOG_ERR, "db: txn_commit: %s", db_strerror(ret));
88                 cdb_abort();
89         }
90 }
91
92
93 // wrapper for txn_begin() that logs/aborts on error
94 static void txbegin(DB_TXN **tid) {
95         int ret;
96
97         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
98
99         if (ret) {
100                 syslog(LOG_ERR, "db: txn_begin: %s", db_strerror(ret));
101                 cdb_abort();
102         }
103 }
104
105
106 // panic callback
107 static void dbpanic(DB_ENV *env, int errval) {
108         syslog(LOG_ERR, "db: PANIC: %s", db_strerror(errval));
109         cdb_abort();
110 }
111
112
113 static void cclose(DBC *cursor) {
114         int ret;
115
116         if ((ret = cursor->c_close(cursor))) {
117                 syslog(LOG_ERR, "db: c_close: %s", db_strerror(ret));
118                 cdb_abort();
119         }
120 }
121
122
123 static void bailIfCursor(DBC **cursors, const char *msg) {
124         int i;
125
126         for (i = 0; i < MAXCDB; i++)
127                 if (cursors[i] != NULL) {
128                         syslog(LOG_ERR, "db: cursor still in progress on cdb %02x: %s", i, msg);
129                         cdb_abort();
130                 }
131 }
132
133
134 void cdb_check_handles(void) {
135         bailIfCursor(TSD->cursors, "in check_handles");
136
137         if (TSD->tid != NULL) {
138                 syslog(LOG_ERR, "db: transaction still in progress!");
139                 cdb_abort();
140         }
141 }
142
143
144 // Request a checkpoint of the database.  Called once per minute by the thread manager.
145 void cdb_checkpoint(void) {
146         int ret;
147
148         syslog(LOG_DEBUG, "db: -- checkpoint --");
149         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
150
151         if (ret != 0) {
152                 syslog(LOG_ERR, "db: cdb_checkpoint() txn_checkpoint: %s", db_strerror(ret));
153                 cdb_abort();
154         }
155
156         // After a successful checkpoint, we can cull the unused logs
157         if (CtdlGetConfigInt("c_auto_cull")) {
158                 ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 1);
159         }
160         else {
161                 ret = dbenv->log_set_config(dbenv, DB_LOG_AUTO_REMOVE, 0);
162         }
163 }
164
165
166 // Open the various databases we'll be using.  Any database which
167 // does not exist should be created.  Note that we don't need a
168 // critical section here, because there aren't any active threads
169 // manipulating the database yet.
170 void open_databases(void) {
171         int ret;
172         int i;
173         char dbfilename[32];
174         u_int32_t flags = 0;
175         int dbversion_major, dbversion_minor, dbversion_patch;
176
177         syslog(LOG_DEBUG, "db: open_databases() starting");
178         syslog(LOG_DEBUG, "db:    Linked zlib: %s", zlibVersion());
179         syslog(LOG_DEBUG, "db: Compiled libdb: %s", DB_VERSION_STRING);
180         syslog(LOG_DEBUG, "db:   Linked libdb: %s", db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
181
182         // Create synthetic integer version numbers and compare them.
183         // Never allow citserver to run with a libdb older then the one with which it was compiled.
184         int compiled_db_version = ( (DB_VERSION_MAJOR * 1000000) + (DB_VERSION_MINOR * 1000) + (DB_VERSION_PATCH) );
185         int linked_db_version = ( (dbversion_major * 1000000) + (dbversion_minor * 1000) + (dbversion_patch) );
186         if (compiled_db_version > linked_db_version) {
187                 syslog(LOG_ERR, "db: citserver is running with a version of libdb older than the one with which it was compiled.");
188                 syslog(LOG_ERR, "db: This is an invalid configuration.  citserver will now exit to prevent data loss.");
189                 exit(CTDLEXIT_DB);
190         }
191
192         // Silently try to create the database subdirectory.  If it's already there, no problem.
193         if ((mkdir(ctdl_db_dir, 0700) != 0) && (errno != EEXIST)) {
194                 syslog(LOG_ERR, "db: database directory [%s] does not exist and could not be created: %m", ctdl_db_dir);
195                 exit(CTDLEXIT_DB);
196         }
197         if (chmod(ctdl_db_dir, 0700) != 0) {
198                 syslog(LOG_ERR, "db: unable to set database directory permissions [%s]: %m", ctdl_db_dir);
199                 exit(CTDLEXIT_DB);
200         }
201         if (chown(ctdl_db_dir, CTDLUID, (-1)) != 0) {
202                 syslog(LOG_ERR, "db: unable to set the owner for [%s]: %m", ctdl_db_dir);
203                 exit(CTDLEXIT_DB);
204         }
205         syslog(LOG_DEBUG, "db: Setting up DB environment");
206         ret = db_env_create(&dbenv, 0);
207         if (ret) {
208                 syslog(LOG_ERR, "db: db_env_create: %s", db_strerror(ret));
209                 syslog(LOG_ERR, "db: exit code %d", ret);
210                 exit(CTDLEXIT_DB);
211         }
212         dbenv->set_errpfx(dbenv, "citserver");
213         dbenv->set_paniccall(dbenv, dbpanic);
214         dbenv->set_errcall(dbenv, cdb_verbose_err);
215         dbenv->set_errpfx(dbenv, "ctdl");
216         dbenv->set_msgcall(dbenv, cdb_verbose_log);
217         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
218         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
219
220         // We want to specify the shared memory buffer pool cachesize, but everything else is the default.
221         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
222         if (ret) {
223                 syslog(LOG_ERR, "db: set_cachesize: %s", db_strerror(ret));
224                 dbenv->close(dbenv, 0);
225                 syslog(LOG_ERR, "db: exit code %d", ret);
226                 exit(CTDLEXIT_DB);
227         }
228
229         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
230                 syslog(LOG_ERR, "db: set_lk_detect: %s", db_strerror(ret));
231                 dbenv->close(dbenv, 0);
232                 syslog(LOG_ERR, "db: exit code %d", ret);
233                 exit(CTDLEXIT_DB);
234         }
235
236         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_INIT_LOG;
237         syslog(LOG_DEBUG, "db: dbenv->open(dbenv, %s, %d, 0)", ctdl_db_dir, flags);
238         ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                                // try opening the database cleanly
239         if (ret == DB_RUNRECOVERY) {
240                 syslog(LOG_ERR, "db: dbenv->open: %s", db_strerror(ret));
241                 syslog(LOG_ERR, "db: attempting recovery...");
242                 flags |= DB_RECOVER;
243                 ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try recovery
244         }
245         if (ret == DB_RUNRECOVERY) {
246                 syslog(LOG_ERR, "db: dbenv->open: %s", db_strerror(ret));
247                 syslog(LOG_ERR, "db: attempting catastrophic recovery...");
248                 flags &= ~DB_RECOVER;
249                 flags |= DB_RECOVER_FATAL;
250                 ret = dbenv->open(dbenv, ctdl_db_dir, flags, 0);                        // try catastrophic recovery
251         }
252         if (ret) {
253                 syslog(LOG_ERR, "db: dbenv->open: %s", db_strerror(ret));
254                 dbenv->close(dbenv, 0);
255                 syslog(LOG_ERR, "db: exit code %d", ret);
256                 exit(CTDLEXIT_DB);
257         }
258
259         syslog(LOG_INFO, "db: mounting databases");
260         for (i = 0; i < MAXCDB; ++i) {
261                 ret = db_create(&dbp[i], dbenv, 0);                                     // Create a database handle
262                 if (ret) {
263                         syslog(LOG_ERR, "db: db_create: %s", db_strerror(ret));
264                         syslog(LOG_ERR, "db: exit code %d", ret);
265                         exit(CTDLEXIT_DB);
266                 }
267
268                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);                 // table names by number
269                 ret = dbp[i]->open(dbp[i], NULL, dbfilename, NULL, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0600);
270                 if (ret) {
271                         syslog(LOG_ERR, "db: db_open[%02x]: %s", i, db_strerror(ret));
272                         if (ret == ENOMEM) {
273                                 syslog(LOG_ERR, "db: You may need to tune your database; please check http://www.citadel.org for more information.");
274                         }
275                         syslog(LOG_ERR, "db: exit code %d", ret);
276                         exit(CTDLEXIT_DB);
277                 }
278         }
279 }
280
281
282 // Make sure we own all the files, because in a few milliseconds we're going to drop root privs.
283 void cdb_chmod_data(void) {
284         DIR *dp;
285         struct dirent *d;
286         char filename[PATH_MAX];
287
288         dp = opendir(ctdl_db_dir);
289         if (dp != NULL) {
290                 while (d = readdir(dp), d != NULL) {
291                         if (d->d_name[0] != '.') {
292                                 snprintf(filename, sizeof filename, "%s/%s", ctdl_db_dir, d->d_name);
293                                 syslog(LOG_DEBUG, "db: chmod(%s, 0600) returned %d", filename, chmod(filename, 0600));
294                                 syslog(LOG_DEBUG, "db: chown(%s, CTDLUID, -1) returned %d", filename, chown(filename, CTDLUID, (-1)));
295                         }
296                 }
297                 closedir(dp);
298         }
299 }
300
301
302 // Close all of the db database files we've opened.  This can be done in a loop, since it's just a bunch of closes.
303 void close_databases(void) {
304         int i;
305         int ret;
306
307         syslog(LOG_INFO, "db: performing final checkpoint");
308         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
309                 syslog(LOG_ERR, "db: txn_checkpoint: %s", db_strerror(ret));
310         }
311
312         syslog(LOG_INFO, "db: flushing the database logs");
313         if ((ret = dbenv->log_flush(dbenv, NULL))) {
314                 syslog(LOG_ERR, "db: log_flush: %s", db_strerror(ret));
315         }
316
317         // close the tables
318         syslog(LOG_INFO, "db: closing databases");
319         for (i = 0; i < MAXCDB; ++i) {
320                 syslog(LOG_INFO, "db: closing database %02x", i);
321                 ret = dbp[i]->close(dbp[i], 0);
322                 if (ret) {
323                         syslog(LOG_ERR, "db: db_close: %s", db_strerror(ret));
324                 }
325
326         }
327
328         // This seemed nifty at the time but did anyone really look at it?
329         // #ifdef DB_STAT_ALL
330         // dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
331         // #endif
332
333         // Close the handle.
334         ret = dbenv->close(dbenv, 0);
335         if (ret) {
336                 syslog(LOG_ERR, "db: DBENV->close: %s", db_strerror(ret));
337         }
338 }
339
340
341 // Decompress a database item if it was compressed on disk
342 void cdb_decompress_if_necessary(struct cdbdata *cdb) {
343         static int magic = COMPRESS_MAGIC;
344
345         if ((cdb == NULL) || (cdb->ptr == NULL) || (cdb->len < sizeof(magic)) || (memcmp(cdb->ptr, &magic, sizeof(magic)))) {
346                 return;
347         }
348
349         // At this point we know we're looking at a compressed item.
350
351         struct CtdlCompressHeader zheader;
352         char *uncompressed_data;
353         char *compressed_data;
354         uLongf destLen, sourceLen;
355         size_t cplen;
356
357         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
358         cplen = sizeof(struct CtdlCompressHeader);
359         if (sizeof(struct CtdlCompressHeader) > cdb->len) {
360                 cplen = cdb->len;
361         }
362         memcpy(&zheader, cdb->ptr, cplen);
363
364         compressed_data = cdb->ptr;
365         compressed_data += sizeof(struct CtdlCompressHeader);
366
367         sourceLen = (uLongf) zheader.compressed_len;
368         destLen = (uLongf) zheader.uncompressed_len;
369         uncompressed_data = malloc(zheader.uncompressed_len);
370
371         if (uncompress((Bytef *) uncompressed_data,
372                        (uLongf *) & destLen, (const Bytef *) compressed_data, (uLong) sourceLen) != Z_OK) {
373                 syslog(LOG_ERR, "db: uncompress() error");
374                 cdb_abort();
375         }
376
377         free(cdb->ptr);
378         cdb->len = (size_t) destLen;
379         cdb->ptr = uncompressed_data;
380 }
381
382
383 /*
384  * Store a piece of data.  Returns 0 if the operation was successful.  If a
385  * key already exists it should be overwritten.
386  */
387 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen) {
388
389         DBT dkey, ddata;
390         DB_TXN *tid = NULL;
391         int ret = 0;
392         struct CtdlCompressHeader zheader;
393         char *compressed_data = NULL;
394         int compressing = 0;
395         size_t buffer_len = 0;
396         uLongf destLen = 0;
397
398         memset(&dkey, 0, sizeof(DBT));
399         memset(&ddata, 0, sizeof(DBT));
400         dkey.size = ckeylen;
401         dkey.data = (void *) ckey;
402         ddata.size = cdatalen;
403         ddata.data = cdata;
404
405         // Only compress Visit and UseTable records.  Everything else is uncompressed.
406         if ((cdb == CDB_VISIT) || (cdb == CDB_USETABLE)) {
407                 compressing = 1;
408                 zheader.magic = COMPRESS_MAGIC;
409                 zheader.uncompressed_len = cdatalen;
410                 buffer_len = ((cdatalen * 101) / 100) + 100 + sizeof(struct CtdlCompressHeader);
411                 destLen = (uLongf) buffer_len;
412                 compressed_data = malloc(buffer_len);
413                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
414                               &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK) {
415                         syslog(LOG_ERR, "db: compress2() error");
416                         cdb_abort();
417                 }
418                 zheader.compressed_len = (size_t) destLen;
419                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
420                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
421                 ddata.data = compressed_data;
422         }
423
424         if (TSD->tid != NULL) {
425                 ret = dbp[cdb]->put(dbp[cdb],   // db
426                                     TSD->tid,   // transaction ID
427                                     &dkey,      // key
428                                     &ddata,     // data
429                                     0           // flags
430                 );
431                 if (ret) {
432                         syslog(LOG_ERR, "db: cdb_store(%d): %s", cdb, db_strerror(ret));
433                         cdb_abort();
434                 }
435                 if (compressing) {
436                         free(compressed_data);
437                 }
438                 return ret;
439         }
440         else {
441                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
442
443               retry:
444                 txbegin(&tid);
445
446                 if ((ret = dbp[cdb]->put(dbp[cdb],      // db
447                                          tid,           // transaction ID
448                                          &dkey,         // key
449                                          &ddata,        // data
450                                          0))) {         // flags
451                         if (ret == DB_LOCK_DEADLOCK) {
452                                 txabort(tid);
453                                 goto retry;
454                         }
455                         else {
456                                 syslog(LOG_ERR, "db: cdb_store(%d): %s", cdb, db_strerror(ret));
457                                 cdb_abort();
458                         }
459                 }
460                 else {
461                         txcommit(tid);
462                         if (compressing) {
463                                 free(compressed_data);
464                         }
465                         return ret;
466                 }
467         }
468         return ret;
469 }
470
471
472 // Delete a piece of data.  Returns 0 if the operation was successful.
473 int cdb_delete(int cdb, void *key, int keylen) {
474         DBT dkey;
475         DB_TXN *tid;
476         int ret;
477
478         memset(&dkey, 0, sizeof dkey);
479         dkey.size = keylen;
480         dkey.data = key;
481
482         if (TSD->tid != NULL) {
483                 ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0);
484                 if (ret) {
485                         syslog(LOG_ERR, "db: cdb_delete(%d): %s", cdb, db_strerror(ret));
486                         if (ret != DB_NOTFOUND) {
487                                 cdb_abort();
488                         }
489                 }
490         }
491         else {
492                 bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor");
493
494               retry:
495                 txbegin(&tid);
496
497                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0)) && ret != DB_NOTFOUND) {
498                         if (ret == DB_LOCK_DEADLOCK) {
499                                 txabort(tid);
500                                 goto retry;
501                         }
502                         else {
503                                 syslog(LOG_ERR, "db: cdb_delete(%d): %s", cdb, db_strerror(ret));
504                                 cdb_abort();
505                         }
506                 }
507                 else {
508                         txcommit(tid);
509                 }
510         }
511         return ret;
512 }
513
514
515 static DBC *localcursor(int cdb) {
516         int ret;
517         DBC *curs;
518
519         if (TSD->cursors[cdb] == NULL) {
520                 ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0);
521         }
522         else {
523                 ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
524         }
525
526         if (ret) {
527                 syslog(LOG_ERR, "db: localcursor: %s", db_strerror(ret));
528                 cdb_abort();
529         }
530
531         return curs;
532 }
533
534
535 // Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
536 // a struct cdbdata which it is the caller's responsibility to free later on
537 // using the cdb_free() routine.
538 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen) {
539
540         if (keylen == 0) {              // key length zero is impossible
541                 return(NULL);
542         }
543
544         struct cdbdata *tempcdb;
545         DBT dkey, dret;
546         int ret;
547
548         memset(&dkey, 0, sizeof(DBT));
549         dkey.size = keylen;
550         dkey.data = (void *) key;
551
552         if (TSD->tid != NULL) {
553                 memset(&dret, 0, sizeof(DBT));
554                 dret.flags = DB_DBT_MALLOC;
555                 ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0);
556         }
557         else {
558                 DBC *curs;
559
560                 do {
561                         memset(&dret, 0, sizeof(DBT));
562                         dret.flags = DB_DBT_MALLOC;
563                         curs = localcursor(cdb);
564                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
565                         cclose(curs);
566                 } while (ret == DB_LOCK_DEADLOCK);
567         }
568
569         if ((ret != 0) && (ret != DB_NOTFOUND)) {
570                 syslog(LOG_ERR, "db: cdb_fetch(%d): %s", cdb, db_strerror(ret));
571                 cdb_abort();
572         }
573
574         if (ret != 0) {
575                 return NULL;
576         }
577
578         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
579         if (tempcdb == NULL) {
580                 syslog(LOG_ERR, "db: cdb_fetch() cannot allocate memory for tempcdb: %m");
581                 cdb_abort();
582         }
583         else {
584                 tempcdb->len = dret.size;
585                 tempcdb->ptr = dret.data;
586                 cdb_decompress_if_necessary(tempcdb);
587                 return (tempcdb);
588         }
589 }
590
591
592 // Free a cdbdata item.
593 //
594 // Note that we only free the 'ptr' portion if it is not NULL.  This allows
595 // other code to assume ownership of that memory simply by storing the
596 // pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
597 // avoid freeing it.
598 void cdb_free(struct cdbdata *cdb) {
599         if (cdb->ptr) {
600                 free(cdb->ptr);
601         }
602         free(cdb);
603 }
604
605
606 void cdb_close_cursor(int cdb) {
607         if (TSD->cursors[cdb] != NULL) {
608                 cclose(TSD->cursors[cdb]);
609         }
610
611         TSD->cursors[cdb] = NULL;
612 }
613
614
615 // Prepare for a sequential search of an entire database.
616 // (There is guaranteed to be no more than one traversal in
617 // progress per thread at any given time.)
618 void cdb_rewind(int cdb) {
619         int ret = 0;
620
621         if (TSD->cursors[cdb] != NULL) {
622                 syslog(LOG_ERR, "db: cdb_rewind: must close cursor on database %d before reopening", cdb);
623                 cdb_abort();
624                 // cclose(TSD->cursors[cdb]);
625         }
626
627         // Now initialize the cursor
628         ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0);
629         if (ret) {
630                 syslog(LOG_ERR, "db: cdb_rewind: db_cursor: %s", db_strerror(ret));
631                 cdb_abort();
632         }
633 }
634
635
636 // Fetch the next item in a sequential search.  Returns a pointer to a 
637 // cdbdata structure, or NULL if we've hit the end.
638 struct cdbdata *cdb_next_item(int cdb) {
639         DBT key, data;
640         struct cdbdata *cdbret;
641         int ret = 0;
642
643         // Initialize the key/data pair so the flags aren't set.
644         memset(&key, 0, sizeof(key));
645         memset(&data, 0, sizeof(data));
646         data.flags = DB_DBT_MALLOC;
647
648         ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT);
649
650         if (ret) {
651                 if (ret != DB_NOTFOUND) {
652                         syslog(LOG_ERR, "db: cdb_next_item(%d): %s", cdb, db_strerror(ret));
653                         cdb_abort();
654                 }
655                 cdb_close_cursor(cdb);
656                 return NULL;    // presumably, end of file
657         }
658
659         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
660         cdbret->len = data.size;
661         cdbret->ptr = data.data;
662         cdb_decompress_if_necessary(cdbret);
663
664         return (cdbret);
665 }
666
667
668 // Transaction-based stuff.  I'm writing this as I bake cookies...
669 void cdb_begin_transaction(void) {
670         bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
671
672         if (TSD->tid != NULL) {
673                 syslog(LOG_ERR, "db: cdb_begin_transaction: ERROR: nested transaction");
674                 cdb_abort();
675         }
676
677         txbegin(&TSD->tid);
678 }
679
680
681 void cdb_end_transaction(void) {
682         int i;
683
684         for (i = 0; i < MAXCDB; i++) {
685                 if (TSD->cursors[i] != NULL) {
686                         syslog(LOG_WARNING, "db: cdb_end_transaction: WARNING: cursor %d still open at transaction end", i);
687                         cclose(TSD->cursors[i]);
688                         TSD->cursors[i] = NULL;
689                 }
690         }
691
692         if (TSD->tid == NULL) {
693                 syslog(LOG_ERR, "db: cdb_end_transaction: ERROR: txcommit(NULL) !!");
694                 cdb_abort();
695         }
696         else {
697                 txcommit(TSD->tid);
698         }
699
700         TSD->tid = NULL;
701 }
702
703
704 // Truncate (delete every record)
705 void cdb_trunc(int cdb) {
706         int ret;
707         u_int32_t count;
708
709         if (TSD->tid != NULL) {
710                 syslog(LOG_ERR, "db: cdb_trunc must not be called in a transaction.");
711                 cdb_abort();
712         }
713         else {
714                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
715
716               retry:
717
718                 if ((ret = dbp[cdb]->truncate(dbp[cdb], // db
719                                               NULL,     // transaction ID
720                                               &count,   // #rows deleted
721                                               0))) {    // flags
722                         if (ret == DB_LOCK_DEADLOCK) {
723                                 goto retry;
724                         }
725                         else {
726                                 syslog(LOG_ERR, "db: cdb_truncate(%d): %s", cdb, db_strerror(ret));
727                                 if (ret == ENOMEM) {
728                                         syslog(LOG_ERR, "db: You may need to tune your database; please read http://www.citadel.org for more information.");
729                                 }
730                                 exit(CTDLEXIT_DB);
731                         }
732                 }
733         }
734 }
735
736
737 // compact (defragment) the database, possibly returning space back to the underlying filesystem
738 void cdb_compact(void) {
739         int ret;
740         int i;
741
742         syslog(LOG_DEBUG, "db: cdb_compact() started");
743         for (i = 0; i < MAXCDB; i++) {
744                 syslog(LOG_DEBUG, "db: compacting database %d", i);
745                 ret = dbp[i]->compact(dbp[i], NULL, NULL, NULL, NULL, DB_FREE_SPACE, NULL);
746                 if (ret) {
747                         syslog(LOG_ERR, "db: compact: %s", db_strerror(ret));
748                 }
749         }
750         syslog(LOG_DEBUG, "db: cdb_compact() finished");
751 }
752
753
754 // Has an item already been seen (is it in the CDB_USETABLE) ?
755 // Returns 0 if it hasn't, 1 if it has
756 // In either case, writes the item to the database for next time.
757 int CheckIfAlreadySeen(StrBuf *guid) {
758         int found = 0;
759         struct UseTable ut;
760         struct cdbdata *cdbut;
761
762         syslog(LOG_DEBUG, "db: CheckIfAlreadySeen(%s)", ChrPtr(guid));
763         cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid));
764         if (cdbut != NULL) {
765                 found = 1;
766                 cdb_free(cdbut);
767         }
768
769         // (Re)write the record, to update the timestamp.  Zeroing it out makes it compress better.
770         memset(&ut, 0, sizeof(struct UseTable));
771         memcpy(ut.ut_msgid, SKEY(guid));
772         ut.ut_timestamp = time(NULL);
773         cdb_store(CDB_USETABLE, SKEY(guid), &ut, sizeof(struct UseTable));
774         return (found);
775 }
776
777
778 char *ctdl_module_init_database() {
779         if (!threading) {
780                 // nothing to do here
781         }
782
783         // return our module id for the log
784         return "database";
785 }