9aa870091d6f1dc4684e20617a9149823cff6305
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "BDB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
94                         db_strerror(ret));
95                 abort();
96         }
97 }
98
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->commit(tid, 0);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
108                         db_strerror(ret));
109                 abort();
110         }
111 }
112
113 /* are you sensing a pattern yet? */
114 static void txbegin(DB_TXN ** tid)
115 {
116         int ret;
117
118         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
119
120         if (ret) {
121                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
122                         db_strerror(ret));
123                 abort();
124         }
125 }
126
127 static void dbpanic(DB_ENV * env, int errval)
128 {
129         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
130 }
131
132 static void cclose(DBC * cursor)
133 {
134         int ret;
135
136         if ((ret = cursor->c_close(cursor))) {
137                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
138                         db_strerror(ret));
139                 abort();
140         }
141 }
142
143 static void bailIfCursor(DBC ** cursors, const char *msg)
144 {
145         int i;
146
147         for (i = 0; i < MAXCDB; i++)
148                 if (cursors[i] != NULL) {
149                         CtdlLogPrintf(CTDL_EMERG,
150                                 "cdb_*: cursor still in progress on cdb %02x: %s\n",
151                                 i, msg);
152                         abort();
153                 }
154 }
155
156 void check_handles(void *arg)
157 {
158         if (arg != NULL) {
159                 ThreadTSD *tsd = (ThreadTSD *) arg;
160
161                 bailIfCursor(tsd->cursors, "in check_handles");
162
163                 if (tsd->tid != NULL) {
164                         CtdlLogPrintf(CTDL_EMERG,
165                                 "cdb_*: transaction still in progress!");
166                         abort();
167                 }
168         }
169 }
170
171 void cdb_check_handles(void)
172 {
173         check_handles(pthread_getspecific(ThreadKey));
174 }
175
176
177 /*
178  * Cull the database logs
179  */
180 static void cdb_cull_logs(void)
181 {
182         u_int32_t flags;
183         int ret;
184         char **file, **list;
185         char errmsg[SIZ];
186
187         flags = DB_ARCH_ABS;
188
189         /* Get the list of names. */
190         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
191                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
192                 return;
193         }
194
195         /* Print the list of names. */
196         if (list != NULL) {
197                 for (file = list; *file != NULL; ++file) {
198                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
199                         ret = unlink(*file);
200                         if (ret != 0) {
201                                 snprintf(errmsg, sizeof(errmsg),
202                                          " ** ERROR **\n \n \n "
203                                          "Citadel was unable to delete the "
204                                          "database log file '%s' because of the "
205                                          "following error:\n \n %s\n \n"
206                                          " This log file is no longer in use "
207                                          "and may be safely deleted.\n",
208                                          *file, strerror(errno));
209                                 aide_message(errmsg, "Database Warning Message");
210                         }
211                 }
212                 free(list);
213         }
214 }
215
216 /*
217  * Manually initiate log file cull.
218  */
219 void cmd_cull(char *argbuf) {
220         if (CtdlAccessCheck(ac_internal)) return;
221         cdb_cull_logs();
222         cprintf("%d Database log file cull completed.\n", CIT_OK);
223 }
224
225
226 /*
227  * Request a checkpoint of the database.
228  */
229 void cdb_checkpoint(void)
230 {
231         int ret;
232 //      static time_t last_run = 0L;
233
234         /* Only do a checkpoint once per minute. */
235 /*
236  * Don't need this any more, since the thread that calls us sleeps for 60 seconds between calls
237  
238         if ((time(NULL) - last_run) < 60L) {
239                 return;
240         }
241         last_run = time(NULL);
242 */
243
244         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
245         ret = dbenv->txn_checkpoint(dbenv,
246                                     MAX_CHECKPOINT_KBYTES,
247                                     MAX_CHECKPOINT_MINUTES, 0);
248
249         if (ret != 0) {
250                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
251                         db_strerror(ret));
252                 abort();
253         }
254
255         /* After a successful checkpoint, we can cull the unused logs */
256         if (config.c_auto_cull) {
257                 cdb_cull_logs();
258         }
259 }
260
261
262
263 /*
264  * Open the various databases we'll be using.  Any database which
265  * does not exist should be created.  Note that we don't need a
266  * critical section here, because there aren't any active threads
267  * manipulating the database yet.
268  */
269 void open_databases(void)
270 {
271         int ret;
272         int i;
273         char dbfilename[SIZ];
274         u_int32_t flags = 0;
275         int dbversion_major, dbversion_minor, dbversion_patch;
276         int current_dbversion = 0;
277
278         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
279         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
280         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
281                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
282
283         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
284
285         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
286         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
287
288         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
289            && (CitControl.MMdbversion > current_dbversion) ) {
290                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
291                                         "of Berkeley DB that is older than that which last created or\n"
292                                         "updated the database.  Because this would probably cause data\n"
293                                         "corruption or loss, the server is aborting execution now.\n");
294                 exit(CTDLEXIT_DB);
295         }
296
297         CitControl.MMdbversion = current_dbversion;
298         put_control();
299
300 #ifdef HAVE_ZLIB
301         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
302 #endif
303
304         /*
305          * Silently try to create the database subdirectory.  If it's
306          * already there, no problem.
307          */
308         mkdir(ctdl_data_dir, 0700);
309         chmod(ctdl_data_dir, 0700);
310         chown(ctdl_data_dir, CTDLUID, (-1));
311
312         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
313         db_env_set_func_yield(sched_yield);
314         ret = db_env_create(&dbenv, 0);
315         if (ret) {
316                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
317                         db_strerror(ret));
318                 exit(CTDLEXIT_DB);
319         }
320         dbenv->set_errpfx(dbenv, "citserver");
321         dbenv->set_paniccall(dbenv, dbpanic);
322         dbenv->set_errcall(dbenv, cdb_verbose_err);
323         dbenv->set_errpfx(dbenv, "ctdl");
324 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
325         dbenv->set_msgcall(dbenv, cdb_verbose_log);
326 #endif
327         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
328         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
329
330         /*
331          * We want to specify the shared memory buffer pool cachesize,
332          * but everything else is the default.
333          */
334         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
335         if (ret) {
336                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
337                         db_strerror(ret));
338                 dbenv->close(dbenv, 0);
339                 exit(CTDLEXIT_DB);
340         }
341
342         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
343                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
344                         db_strerror(ret));
345                 dbenv->close(dbenv, 0);
346                 exit(CTDLEXIT_DB);
347         }
348
349         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
350         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
351         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
352         if (ret == DB_RUNRECOVERY) {
353                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
354                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
355                 flags |= DB_RECOVER;
356                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
357         }
358         if (ret == DB_RUNRECOVERY) {
359                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
360                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
361                 flags &= ~DB_RECOVER;
362                 flags |= DB_RECOVER_FATAL;
363                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
364         }
365         if (ret) {
366                 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open: %s\n", db_strerror(ret));
367                 dbenv->close(dbenv, 0);
368                 exit(CTDLEXIT_DB);
369         }
370
371         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
372
373         for (i = 0; i < MAXCDB; ++i) {
374
375                 /* Create a database handle */
376                 ret = db_create(&dbp[i], dbenv, 0);
377                 if (ret) {
378                         CtdlLogPrintf(CTDL_DEBUG, "db_create: %s\n",
379                                 db_strerror(ret));
380                         exit(CTDLEXIT_DB);
381                 }
382
383
384                 /* Arbitrary names for our tables -- we reference them by
385                  * number, so we don't have string names for them.
386                  */
387                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
388
389                 ret = dbp[i]->open(dbp[i],
390                                    NULL,
391                                    dbfilename,
392                                    NULL,
393                                    DB_BTREE,
394                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
395                                    0600);
396                 if (ret) {
397                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i,
398                                 db_strerror(ret));
399                         exit(CTDLEXIT_DB);
400                 }
401         }
402
403 }
404
405
406 /* Make sure we own all the files, because in a few milliseconds
407  * we're going to drop root privs.
408  */
409 void cdb_chmod_data(void) {
410         DIR *dp;
411         struct dirent *d;
412         char filename[PATH_MAX];
413
414         dp = opendir(ctdl_data_dir);
415         if (dp != NULL) {
416                 while (d = readdir(dp), d != NULL) {
417                         if (d->d_name[0] != '.') {
418                                 snprintf(filename, sizeof filename,
419                                          "%s/%s", ctdl_data_dir, d->d_name);
420                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
421                                         filename, chmod(filename, 0600)
422                                 );
423                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
424                                         filename, chown(filename, CTDLUID, (-1))
425                                 );
426                         }
427                 }
428                 closedir(dp);
429         }
430
431         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
432
433         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
434 }
435
436
437 /*
438  * Close all of the db database files we've opened.  This can be done
439  * in a loop, since it's just a bunch of closes.
440  */
441 void close_databases(void)
442 {
443         int a;
444         int ret;
445
446         ctdl_thread_internal_free_tsd();
447         
448         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
449                 CtdlLogPrintf(CTDL_EMERG,
450                         "txn_checkpoint: %s\n", db_strerror(ret));
451         }
452
453         /* print some statistics... */
454 #ifdef DB_STAT_ALL
455         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
456 #endif
457
458         /* close the tables */
459         for (a = 0; a < MAXCDB; ++a) {
460                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
461                 ret = dbp[a]->close(dbp[a], 0);
462                 if (ret) {
463                         CtdlLogPrintf(CTDL_EMERG,
464                                 "db_close: %s\n", db_strerror(ret));
465                 }
466
467         }
468
469         /* Close the handle. */
470         ret = dbenv->close(dbenv, 0);
471         if (ret) {
472                 CtdlLogPrintf(CTDL_EMERG,
473                         "DBENV->close: %s\n", db_strerror(ret));
474         }
475 }
476
477
478 /*
479  * Compression functions only used if we have zlib
480  */
481 #ifdef HAVE_ZLIB
482
483 void cdb_decompress_if_necessary(struct cdbdata *cdb)
484 {
485         static int magic = COMPRESS_MAGIC;
486         struct CtdlCompressHeader zheader;
487         char *uncompressed_data;
488         char *compressed_data;
489         uLongf destLen, sourceLen;
490
491         if (cdb == NULL)
492                 return;
493         if (cdb->ptr == NULL)
494                 return;
495         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
496                 return;
497
498         /* At this point we know we're looking at a compressed item. */
499         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
500
501         compressed_data = cdb->ptr;
502         compressed_data += sizeof(struct CtdlCompressHeader);
503
504         sourceLen = (uLongf) zheader.compressed_len;
505         destLen = (uLongf) zheader.uncompressed_len;
506         uncompressed_data = malloc(zheader.uncompressed_len);
507
508         if (uncompress((Bytef *) uncompressed_data,
509                        (uLongf *) & destLen,
510                        (const Bytef *) compressed_data,
511                        (uLong) sourceLen) != Z_OK) {
512                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
513                 abort();
514         }
515
516         free(cdb->ptr);
517         cdb->len = (size_t) destLen;
518         cdb->ptr = uncompressed_data;
519 }
520
521 #endif                          /* HAVE_ZLIB */
522
523
524 /*
525  * Store a piece of data.  Returns 0 if the operation was successful.  If a
526  * key already exists it should be overwritten.
527  */
528 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
529 {
530
531         DBT dkey, ddata;
532         DB_TXN *tid;
533         int ret = 0;
534
535 #ifdef HAVE_ZLIB
536         struct CtdlCompressHeader zheader;
537         char *compressed_data = NULL;
538         int compressing = 0;
539         size_t buffer_len = 0;
540         uLongf destLen = 0;
541 #endif
542
543         memset(&dkey, 0, sizeof(DBT));
544         memset(&ddata, 0, sizeof(DBT));
545         dkey.size = ckeylen;
546         dkey.data = ckey;
547         ddata.size = cdatalen;
548         ddata.data = cdata;
549
550 #ifdef HAVE_ZLIB
551         /* Only compress Visit records.  Everything else is uncompressed. */
552         if (cdb == CDB_VISIT) {
553                 compressing = 1;
554                 zheader.magic = COMPRESS_MAGIC;
555                 zheader.uncompressed_len = cdatalen;
556                 buffer_len = ((cdatalen * 101) / 100) + 100
557                     + sizeof(struct CtdlCompressHeader);
558                 destLen = (uLongf) buffer_len;
559                 compressed_data = malloc(buffer_len);
560                 if (compress2((Bytef *) (compressed_data +
561                                          sizeof(struct
562                                                 CtdlCompressHeader)),
563                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
564                               1) != Z_OK) {
565                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
566                         abort();
567                 }
568                 zheader.compressed_len = (size_t) destLen;
569                 memcpy(compressed_data, &zheader,
570                        sizeof(struct CtdlCompressHeader));
571                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
572                                        zheader.compressed_len);
573                 ddata.data = compressed_data;
574         }
575 #endif
576
577         if (MYTID != NULL) {
578                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
579                                     MYTID,      /* transaction ID */
580                                     &dkey,      /* key */
581                                     &ddata,     /* data */
582                                     0); /* flags */
583                 if (ret) {
584                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
585                                 db_strerror(ret));
586                         abort();
587                 }
588 #ifdef HAVE_ZLIB
589                 if (compressing)
590                         free(compressed_data);
591 #endif
592                 return ret;
593
594         } else {
595                 bailIfCursor(MYCURSORS,
596                              "attempt to write during r/o cursor");
597
598               retry:
599                 txbegin(&tid);
600
601                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
602                                          tid,   /* transaction ID */
603                                          &dkey, /* key */
604                                          &ddata,        /* data */
605                                          0))) { /* flags */
606                         if (ret == DB_LOCK_DEADLOCK) {
607                                 txabort(tid);
608                                 goto retry;
609                         } else {
610                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
611                                         cdb, db_strerror(ret));
612                                 abort();
613                         }
614                 } else {
615                         txcommit(tid);
616 #ifdef HAVE_ZLIB
617                         if (compressing)
618                                 free(compressed_data);
619 #endif
620                         return ret;
621                 }
622         }
623 }
624
625
626 /*
627  * Delete a piece of data.  Returns 0 if the operation was successful.
628  */
629 int cdb_delete(int cdb, void *key, int keylen)
630 {
631
632         DBT dkey;
633         DB_TXN *tid;
634         int ret;
635
636         memset(&dkey, 0, sizeof dkey);
637         dkey.size = keylen;
638         dkey.data = key;
639
640         if (MYTID != NULL) {
641                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
642                 if (ret) {
643                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
644                                 db_strerror(ret));
645                         if (ret != DB_NOTFOUND)
646                                 abort();
647                 }
648         } else {
649                 bailIfCursor(MYCURSORS,
650                              "attempt to delete during r/o cursor");
651
652               retry:
653                 txbegin(&tid);
654
655                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
656                     && ret != DB_NOTFOUND) {
657                         if (ret == DB_LOCK_DEADLOCK) {
658                                 txabort(tid);
659                                 goto retry;
660                         } else {
661                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
662                                         cdb, db_strerror(ret));
663                                 abort();
664                         }
665                 } else {
666                         txcommit(tid);
667                 }
668         }
669         return ret;
670 }
671
672 static DBC *localcursor(int cdb)
673 {
674         int ret;
675         DBC *curs;
676
677         if (MYCURSORS[cdb] == NULL)
678                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
679         else
680                 ret =
681                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
682                                           DB_POSITION);
683
684         if (ret) {
685                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
686                 abort();
687         }
688
689         return curs;
690 }
691
692
693 /*
694  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
695  * a struct cdbdata which it is the caller's responsibility to free later on
696  * using the cdb_free() routine.
697  */
698 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
699 {
700
701         struct cdbdata *tempcdb;
702         DBT dkey, dret;
703         int ret;
704
705         memset(&dkey, 0, sizeof(DBT));
706         dkey.size = keylen;
707         dkey.data = key;
708
709         if (MYTID != NULL) {
710                 memset(&dret, 0, sizeof(DBT));
711                 dret.flags = DB_DBT_MALLOC;
712                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
713         } else {
714                 DBC *curs;
715
716                 do {
717                         memset(&dret, 0, sizeof(DBT));
718                         dret.flags = DB_DBT_MALLOC;
719
720                         curs = localcursor(cdb);
721
722                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
723                         cclose(curs);
724                 }
725                 while (ret == DB_LOCK_DEADLOCK);
726
727         }
728
729         if ((ret != 0) && (ret != DB_NOTFOUND)) {
730                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
731                         db_strerror(ret));
732                 abort();
733         }
734
735         if (ret != 0)
736                 return NULL;
737         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
738
739         if (tempcdb == NULL) {
740                 CtdlLogPrintf(CTDL_EMERG,
741                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
742                 abort();
743         }
744
745         tempcdb->len = dret.size;
746         tempcdb->ptr = dret.data;
747 #ifdef HAVE_ZLIB
748         cdb_decompress_if_necessary(tempcdb);
749 #endif
750         return (tempcdb);
751 }
752
753
754 /*
755  * Free a cdbdata item.
756  *
757  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
758  * other code to assume ownership of that memory simply by storing the
759  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
760  * avoid freeing it.
761  */
762 void cdb_free(struct cdbdata *cdb)
763 {
764         if (cdb->ptr) {
765                 free(cdb->ptr);
766         }
767         free(cdb);
768 }
769
770 void cdb_close_cursor(int cdb)
771 {
772         if (MYCURSORS[cdb] != NULL)
773                 cclose(MYCURSORS[cdb]);
774
775         MYCURSORS[cdb] = NULL;
776 }
777
778 /* 
779  * Prepare for a sequential search of an entire database.
780  * (There is guaranteed to be no more than one traversal in
781  * progress per thread at any given time.)
782  */
783 void cdb_rewind(int cdb)
784 {
785         int ret = 0;
786
787         if (MYCURSORS[cdb] != NULL) {
788                 CtdlLogPrintf(CTDL_EMERG,
789                         "cdb_rewind: must close cursor on database %d before reopening.\n",
790                         cdb);
791                 abort();
792                 /* cclose(MYCURSORS[cdb]); */
793         }
794
795         /*
796          * Now initialize the cursor
797          */
798         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
799         if (ret) {
800                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
801                         db_strerror(ret));
802                 abort();
803         }
804 }
805
806
807 /*
808  * Fetch the next item in a sequential search.  Returns a pointer to a 
809  * cdbdata structure, or NULL if we've hit the end.
810  */
811 struct cdbdata *cdb_next_item(int cdb)
812 {
813         DBT key, data;
814         struct cdbdata *cdbret;
815         int ret = 0;
816
817         /* Initialize the key/data pair so the flags aren't set. */
818         memset(&key, 0, sizeof(key));
819         memset(&data, 0, sizeof(data));
820         data.flags = DB_DBT_MALLOC;
821
822         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
823
824         if (ret) {
825                 if (ret != DB_NOTFOUND) {
826                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
827                                 cdb, db_strerror(ret));
828                         abort();
829                 }
830                 cclose(MYCURSORS[cdb]);
831                 MYCURSORS[cdb] = NULL;
832                 return NULL;    /* presumably, end of file */
833         }
834
835         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
836         cdbret->len = data.size;
837         cdbret->ptr = data.data;
838 #ifdef HAVE_ZLIB
839         cdb_decompress_if_necessary(cdbret);
840 #endif
841
842         return (cdbret);
843 }
844
845
846
847 /*
848  * Transaction-based stuff.  I'm writing this as I bake cookies...
849  */
850
851 void cdb_begin_transaction(void)
852 {
853
854         bailIfCursor(MYCURSORS,
855                      "can't begin transaction during r/o cursor");
856
857         if (MYTID != NULL) {
858                 CtdlLogPrintf(CTDL_EMERG,
859                         "cdb_begin_transaction: ERROR: nested transaction\n");
860                 abort();
861         }
862
863         txbegin(&MYTID);
864 }
865
866 void cdb_end_transaction(void)
867 {
868         int i;
869
870         for (i = 0; i < MAXCDB; i++)
871                 if (MYCURSORS[i] != NULL) {
872                         CtdlLogPrintf(CTDL_WARNING,
873                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
874                                 i);
875                         cclose(MYCURSORS[i]);
876                         MYCURSORS[i] = NULL;
877                 }
878
879         if (MYTID == NULL) {
880                 CtdlLogPrintf(CTDL_EMERG,
881                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
882                 abort();
883         } else
884                 txcommit(MYTID);
885
886         MYTID = NULL;
887 }
888
889 /*
890  * Truncate (delete every record)
891  */
892 void cdb_trunc(int cdb)
893 {
894         /* DB_TXN *tid; */
895         int ret;
896         u_int32_t count;
897
898         if (MYTID != NULL) {
899                 CtdlLogPrintf(CTDL_EMERG,
900                         "cdb_trunc must not be called in a transaction.\n");
901                 abort();
902         } else {
903                 bailIfCursor(MYCURSORS,
904                              "attempt to write during r/o cursor");
905
906               retry:
907                 /* txbegin(&tid); */
908
909                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
910                                               NULL,     /* transaction ID */
911                                               &count,   /* #rows deleted */
912                                               0))) {    /* flags */
913                         if (ret == DB_LOCK_DEADLOCK) {
914                                 /* txabort(tid); */
915                                 goto retry;
916                         } else {
917                                 CtdlLogPrintf(CTDL_EMERG,
918                                         "cdb_truncate(%d): %s\n", cdb,
919                                         db_strerror(ret));
920                                 abort();
921                         }
922                 } else {
923                         /* txcommit(tid); */
924                 }
925         }
926 }