cfa60460a54cca3b3bbf43ca2d5dfa0c0c5e1878
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <pthread.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "serv_extensions.h"
50 #include "citserver.h"
51 #include "database.h"
52 #include "msgbase.h"
53 #include "sysdep_decls.h"
54 #include "config.h"
55
56 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
57 static DB_ENV *dbenv;           /* The DB environment (global) */
58
59 struct cdbtsd {                 /* Thread-specific DB stuff */
60         DB_TXN *tid;            /* Transaction handle */
61         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
62 };
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68 static pthread_key_t tsdkey;
69
70 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
71 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
72
73 /* Verbose logging callback */
74 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
75 {
76         lprintf(CTDL_DEBUG, "BDB: %s\n", msg);
77 }
78
79
80 /* Verbose logging callback */
81 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
82 {
83         lprintf(CTDL_ALERT, "BDB: %s\n", msg);
84 }
85
86
87 /* just a little helper function */
88 static void txabort(DB_TXN * tid)
89 {
90         int ret;
91
92         ret = tid->abort(tid);
93
94         if (ret) {
95                 lprintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
96                         db_strerror(ret));
97                 abort();
98         }
99 }
100
101 /* this one is even more helpful than the last. */
102 static void txcommit(DB_TXN * tid)
103 {
104         int ret;
105
106         ret = tid->commit(tid, 0);
107
108         if (ret) {
109                 lprintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
110                         db_strerror(ret));
111                 abort();
112         }
113 }
114
115 /* are you sensing a pattern yet? */
116 static void txbegin(DB_TXN ** tid)
117 {
118         int ret;
119
120         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
121
122         if (ret) {
123                 lprintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
124                         db_strerror(ret));
125                 abort();
126         }
127 }
128
129 static void dbpanic(DB_ENV * env, int errval)
130 {
131         lprintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
132 }
133
134 static void cclose(DBC * cursor)
135 {
136         int ret;
137
138         if ((ret = cursor->c_close(cursor))) {
139                 lprintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
140                         db_strerror(ret));
141                 abort();
142         }
143 }
144
145 static void bailIfCursor(DBC ** cursors, const char *msg)
146 {
147         int i;
148
149         for (i = 0; i < MAXCDB; i++)
150                 if (cursors[i] != NULL) {
151                         lprintf(CTDL_EMERG,
152                                 "cdb_*: cursor still in progress on cdb %d: %s\n",
153                                 i, msg);
154                         abort();
155                 }
156 }
157
158 static void check_handles(void *arg)
159 {
160         if (arg != NULL) {
161                 struct cdbtsd *tsd = (struct cdbtsd *) arg;
162
163                 bailIfCursor(tsd->cursors, "in check_handles");
164
165                 if (tsd->tid != NULL) {
166                         lprintf(CTDL_EMERG,
167                                 "cdb_*: transaction still in progress!");
168                         abort();
169                 }
170         }
171 }
172
173 static void dest_tsd(void *arg)
174 {
175         if (arg != NULL) {
176                 check_handles(arg);
177                 free(arg);
178         }
179 }
180
181 /*
182  * Ensure that we have a key for thread-specific data.  We don't
183  * put anything in here that Citadel cares about; this is just database
184  * related stuff like cursors and transactions.
185  *
186  * This should be called immediately after startup by any thread which wants
187  * to use database calls, except for whatever thread calls open_databases.
188  */
189 void cdb_allocate_tsd(void)
190 {
191         struct cdbtsd *tsd;
192
193         if (pthread_getspecific(tsdkey) != NULL)
194                 return;
195
196         tsd = malloc(sizeof(struct cdbtsd));
197
198         tsd->tid = NULL;
199
200         memset(tsd->cursors, 0, sizeof tsd->cursors);
201         pthread_setspecific(tsdkey, tsd);
202 }
203
204 void cdb_free_tsd(void)
205 {
206         dest_tsd(pthread_getspecific(tsdkey));
207         pthread_setspecific(tsdkey, NULL);
208 }
209
210 void cdb_check_handles(void)
211 {
212         check_handles(pthread_getspecific(tsdkey));
213 }
214
215
216 /*
217  * Reclaim unused space in the databases.  We need to do each one of
218  * these discretely, rather than in a loop.
219  *
220  * This is a stub function in the Sleepycat DB backend, because there is no
221  * such API call available.
222  */
223 void defrag_databases(void)
224 {
225         /* do nothing */
226 }
227
228
229 /*
230  * Cull the database logs
231  */
232 static void cdb_cull_logs(void)
233 {
234         u_int32_t flags;
235         int ret;
236         char **file, **list;
237         char errmsg[SIZ];
238
239         flags = DB_ARCH_ABS;
240
241         /* Get the list of names. */
242         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
243                 lprintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
244                 return;
245         }
246
247         /* Print the list of names. */
248         if (list != NULL) {
249                 for (file = list; *file != NULL; ++file) {
250                         lprintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
251                         ret = unlink(*file);
252                         if (ret != 0) {
253                                 snprintf(errmsg, sizeof(errmsg),
254                                          " ** ERROR **\n \n \n "
255                                          "Citadel was unable to delete the "
256                                          "database log file '%s' because of the "
257                                          "following error:\n \n %s\n \n"
258                                          " This log file is no longer in use "
259                                          "and may be safely deleted.\n",
260                                          *file, strerror(errno));
261                                 aide_message(errmsg, "Database Warning Message");
262                         }
263                 }
264                 free(list);
265         }
266 }
267
268 /*
269  * Manually initiate log file cull.
270  */
271 void cmd_cull(char *argbuf) {
272         if (CtdlAccessCheck(ac_internal)) return;
273         cdb_cull_logs();
274         cprintf("%d Database log file cull completed.\n", CIT_OK);
275 }
276
277
278 /*
279  * Request a checkpoint of the database.
280  */
281 static void cdb_checkpoint(void)
282 {
283         int ret;
284         static time_t last_run = 0L;
285
286         /* Only do a checkpoint once per minute. */
287         if ((time(NULL) - last_run) < 60L) {
288                 return;
289         }
290         last_run = time(NULL);
291
292         lprintf(CTDL_DEBUG, "-- db checkpoint --\n");
293         ret = dbenv->txn_checkpoint(dbenv,
294                                     MAX_CHECKPOINT_KBYTES,
295                                     MAX_CHECKPOINT_MINUTES, 0);
296
297         if (ret != 0) {
298                 lprintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
299                         db_strerror(ret));
300                 abort();
301         }
302
303         /* After a successful checkpoint, we can cull the unused logs */
304         if (config.c_auto_cull) {
305                 cdb_cull_logs();
306         }
307 }
308
309
310 /*
311  * Main loop for the checkpoint thread.
312  */
313 void *checkpoint_thread(void *arg) {
314         struct CitContext checkpointCC;
315
316         lprintf(CTDL_DEBUG, "checkpoint_thread() initializing\n");
317
318         memset(&checkpointCC, 0, sizeof(struct CitContext));
319         checkpointCC.internal_pgm = 1;
320         checkpointCC.cs_pid = 0;
321         pthread_setspecific(MyConKey, (void *)&checkpointCC );
322
323         cdb_allocate_tsd();
324
325         while (!time_to_die) {
326                 cdb_checkpoint();
327                 sleep(1);
328         }
329
330         lprintf(CTDL_DEBUG, "checkpoint_thread() exiting\n");
331         pthread_exit(NULL);
332 }
333
334 /*
335  * Open the various databases we'll be using.  Any database which
336  * does not exist should be created.  Note that we don't need a
337  * critical section here, because there aren't any active threads
338  * manipulating the database yet.
339  */
340 void open_databases(void)
341 {
342         int ret;
343         int i;
344         char dbfilename[SIZ];
345         u_int32_t flags = 0;
346         DIR *dp;
347         struct dirent *d;
348         char filename[PATH_MAX];
349
350         lprintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
351         lprintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
352         lprintf(CTDL_INFO, "  Linked db: %s\n",
353                 db_version(NULL, NULL, NULL));
354 #ifdef HAVE_ZLIB
355         lprintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
356 #endif
357
358         /*
359          * Silently try to create the database subdirectory.  If it's
360          * already there, no problem.
361          */
362         mkdir(ctdl_data_dir, 0700);
363         chmod(ctdl_data_dir, 0700);
364         chown(ctdl_data_dir, CTDLUID, (-1));
365
366         lprintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
367         db_env_set_func_yield(sched_yield);
368         ret = db_env_create(&dbenv, 0);
369         if (ret) {
370                 lprintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
371                         db_strerror(ret));
372                 exit(CTDLEXIT_DB);
373         }
374         dbenv->set_errpfx(dbenv, "citserver");
375         dbenv->set_paniccall(dbenv, dbpanic);
376         dbenv->set_errcall(dbenv, cdb_verbose_err);
377         dbenv->set_errpfx(dbenv, "ctdl");
378 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
379         dbenv->set_msgcall(dbenv, cdb_verbose_log);
380 #endif
381         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
382         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
383
384         /*
385          * We want to specify the shared memory buffer pool cachesize,
386          * but everything else is the default.
387          */
388         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
389         if (ret) {
390                 lprintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
391                         db_strerror(ret));
392                 dbenv->close(dbenv, 0);
393                 exit(CTDLEXIT_DB);
394         }
395
396         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
397                 lprintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
398                         db_strerror(ret));
399                 dbenv->close(dbenv, 0);
400                 exit(CTDLEXIT_DB);
401         }
402
403         flags =
404             DB_CREATE | DB_RECOVER | DB_INIT_MPOOL |
405             DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD;
406         lprintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n",
407                 ctdl_data_dir, flags);
408         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
409         if (ret) {
410                 lprintf(CTDL_DEBUG, "cdb_*: dbenv->open: %s\n",
411                         db_strerror(ret));
412                 dbenv->close(dbenv, 0);
413                 exit(CTDLEXIT_DB);
414         }
415
416         lprintf(CTDL_INFO, "cdb_*: Starting up DB\n");
417
418         for (i = 0; i < MAXCDB; ++i) {
419
420                 /* Create a database handle */
421                 ret = db_create(&dbp[i], dbenv, 0);
422                 if (ret) {
423                         lprintf(CTDL_DEBUG, "cdb_*: db_create: %s\n",
424                                 db_strerror(ret));
425                         exit(CTDLEXIT_DB);
426                 }
427
428
429                 /* Arbitrary names for our tables -- we reference them by
430                  * number, so we don't have string names for them.
431                  */
432                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
433
434                 ret = dbp[i]->open(dbp[i],
435                                    NULL,
436                                    dbfilename,
437                                    NULL,
438                                    DB_BTREE,
439                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
440                                    0600);
441                 if (ret) {
442                         lprintf(CTDL_EMERG, "cdb_*: db_open[%d]: %s\n", i,
443                                 db_strerror(ret));
444                         exit(CTDLEXIT_DB);
445                 }
446         }
447
448         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
449                 lprintf(CTDL_EMERG, "cdb_*: pthread_key_create: %s\n",
450                         strerror(ret));
451                 exit(CTDLEXIT_DB);
452         }
453
454         cdb_allocate_tsd();
455
456         /* Now make sure we own all the files, because in a few milliseconds
457          * we're going to drop root privs.
458          */
459         dp = opendir(ctdl_data_dir);
460         if (dp != NULL) {
461                 while (d = readdir(dp), d != NULL) {
462                         if (d->d_name[0] != '.') {
463                                 snprintf(filename, sizeof filename,
464                                          "%s/%s", ctdl_data_dir, d->d_name);
465                                 chmod(filename, 0600);
466                                 chown(filename, CTDLUID, (-1));
467                         }
468                 }
469                 closedir(dp);
470         }
471
472         lprintf(CTDL_DEBUG, "cdb_*: open_databases() finished\n");
473
474         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
475 }
476
477
478 /*
479  * Close all of the db database files we've opened.  This can be done
480  * in a loop, since it's just a bunch of closes.
481  */
482 void close_databases(void)
483 {
484         int a;
485         int ret;
486
487         cdb_free_tsd();
488
489         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
490                 lprintf(CTDL_EMERG,
491                         "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
492         }
493
494         for (a = 0; a < MAXCDB; ++a) {
495                 lprintf(CTDL_INFO, "cdb_*: Closing database %d\n", a);
496                 ret = dbp[a]->close(dbp[a], 0);
497                 if (ret) {
498                         lprintf(CTDL_EMERG,
499                                 "cdb_*: db_close: %s\n", db_strerror(ret));
500                 }
501
502         }
503
504         /* Close the handle. */
505         ret = dbenv->close(dbenv, 0);
506         if (ret) {
507                 lprintf(CTDL_EMERG,
508                         "cdb_*: DBENV->close: %s\n", db_strerror(ret));
509         }
510 }
511
512
513 /*
514  * Compression functions only used if we have zlib
515  */
516 #ifdef HAVE_ZLIB
517
518 void cdb_decompress_if_necessary(struct cdbdata *cdb)
519 {
520         static int magic = COMPRESS_MAGIC;
521         struct CtdlCompressHeader zheader;
522         char *uncompressed_data;
523         char *compressed_data;
524         uLongf destLen, sourceLen;
525
526         if (cdb == NULL)
527                 return;
528         if (cdb->ptr == NULL)
529                 return;
530         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
531                 return;
532
533         /* At this point we know we're looking at a compressed item. */
534         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
535
536         compressed_data = cdb->ptr;
537         compressed_data += sizeof(struct CtdlCompressHeader);
538
539         sourceLen = (uLongf) zheader.compressed_len;
540         destLen = (uLongf) zheader.uncompressed_len;
541         uncompressed_data = malloc(zheader.uncompressed_len);
542
543         if (uncompress((Bytef *) uncompressed_data,
544                        (uLongf *) & destLen,
545                        (const Bytef *) compressed_data,
546                        (uLong) sourceLen) != Z_OK) {
547                 lprintf(CTDL_EMERG, "uncompress() error\n");
548                 abort();
549         }
550
551         free(cdb->ptr);
552         cdb->len = (size_t) destLen;
553         cdb->ptr = uncompressed_data;
554 }
555
556 #endif                          /* HAVE_ZLIB */
557
558
559 /*
560  * Store a piece of data.  Returns 0 if the operation was successful.  If a
561  * key already exists it should be overwritten.
562  */
563 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
564 {
565
566         DBT dkey, ddata;
567         DB_TXN *tid;
568         int ret = 0;
569
570 #ifdef HAVE_ZLIB
571         struct CtdlCompressHeader zheader;
572         char *compressed_data = NULL;
573         int compressing = 0;
574         size_t buffer_len = 0;
575         uLongf destLen = 0;
576 #endif
577
578         memset(&dkey, 0, sizeof(DBT));
579         memset(&ddata, 0, sizeof(DBT));
580         dkey.size = ckeylen;
581         dkey.data = ckey;
582         ddata.size = cdatalen;
583         ddata.data = cdata;
584
585 #ifdef HAVE_ZLIB
586         /* Only compress Visit records.  Everything else is uncompressed. */
587         if (cdb == CDB_VISIT) {
588                 compressing = 1;
589                 zheader.magic = COMPRESS_MAGIC;
590                 zheader.uncompressed_len = cdatalen;
591                 buffer_len = ((cdatalen * 101) / 100) + 100
592                     + sizeof(struct CtdlCompressHeader);
593                 destLen = (uLongf) buffer_len;
594                 compressed_data = malloc(buffer_len);
595                 if (compress2((Bytef *) (compressed_data +
596                                          sizeof(struct
597                                                 CtdlCompressHeader)),
598                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
599                               1) != Z_OK) {
600                         lprintf(CTDL_EMERG, "compress2() error\n");
601                         abort();
602                 }
603                 zheader.compressed_len = (size_t) destLen;
604                 memcpy(compressed_data, &zheader,
605                        sizeof(struct CtdlCompressHeader));
606                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
607                                        zheader.compressed_len);
608                 ddata.data = compressed_data;
609         }
610 #endif
611
612         if (MYTID != NULL) {
613                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
614                                     MYTID,      /* transaction ID */
615                                     &dkey,      /* key */
616                                     &ddata,     /* data */
617                                     0); /* flags */
618                 if (ret) {
619                         lprintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
620                                 db_strerror(ret));
621                         abort();
622                 }
623 #ifdef HAVE_ZLIB
624                 if (compressing)
625                         free(compressed_data);
626 #endif
627                 return ret;
628
629         } else {
630                 bailIfCursor(MYCURSORS,
631                              "attempt to write during r/o cursor");
632
633               retry:
634                 txbegin(&tid);
635
636                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
637                                          tid,   /* transaction ID */
638                                          &dkey, /* key */
639                                          &ddata,        /* data */
640                                          0))) { /* flags */
641                         if (ret == DB_LOCK_DEADLOCK) {
642                                 txabort(tid);
643                                 goto retry;
644                         } else {
645                                 lprintf(CTDL_EMERG, "cdb_store(%d): %s\n",
646                                         cdb, db_strerror(ret));
647                                 abort();
648                         }
649                 } else {
650                         txcommit(tid);
651 #ifdef HAVE_ZLIB
652                         if (compressing)
653                                 free(compressed_data);
654 #endif
655                         return ret;
656                 }
657         }
658 }
659
660
661 /*
662  * Delete a piece of data.  Returns 0 if the operation was successful.
663  */
664 int cdb_delete(int cdb, void *key, int keylen)
665 {
666
667         DBT dkey;
668         DB_TXN *tid;
669         int ret;
670
671         memset(&dkey, 0, sizeof dkey);
672         dkey.size = keylen;
673         dkey.data = key;
674
675         if (MYTID != NULL) {
676                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
677                 if (ret) {
678                         lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
679                                 db_strerror(ret));
680                         if (ret != DB_NOTFOUND)
681                                 abort();
682                 }
683         } else {
684                 bailIfCursor(MYCURSORS,
685                              "attempt to delete during r/o cursor");
686
687               retry:
688                 txbegin(&tid);
689
690                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
691                     && ret != DB_NOTFOUND) {
692                         if (ret == DB_LOCK_DEADLOCK) {
693                                 txabort(tid);
694                                 goto retry;
695                         } else {
696                                 lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
697                                         cdb, db_strerror(ret));
698                                 abort();
699                         }
700                 } else {
701                         txcommit(tid);
702                 }
703         }
704         return ret;
705 }
706
707 static DBC *localcursor(int cdb)
708 {
709         int ret;
710         DBC *curs;
711
712         if (MYCURSORS[cdb] == NULL)
713                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
714         else
715                 ret =
716                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
717                                           DB_POSITION);
718
719         if (ret) {
720                 lprintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
721                 abort();
722         }
723
724         return curs;
725 }
726
727
728 /*
729  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
730  * a struct cdbdata which it is the caller's responsibility to free later on
731  * using the cdb_free() routine.
732  */
733 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
734 {
735
736         struct cdbdata *tempcdb;
737         DBT dkey, dret;
738         int ret;
739
740         memset(&dkey, 0, sizeof(DBT));
741         dkey.size = keylen;
742         dkey.data = key;
743
744         if (MYTID != NULL) {
745                 memset(&dret, 0, sizeof(DBT));
746                 dret.flags = DB_DBT_MALLOC;
747                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
748         } else {
749                 DBC *curs;
750
751                 do {
752                         memset(&dret, 0, sizeof(DBT));
753                         dret.flags = DB_DBT_MALLOC;
754
755                         curs = localcursor(cdb);
756
757                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
758                         cclose(curs);
759                 }
760                 while (ret == DB_LOCK_DEADLOCK);
761
762         }
763
764         if ((ret != 0) && (ret != DB_NOTFOUND)) {
765                 lprintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
766                         db_strerror(ret));
767                 abort();
768         }
769
770         if (ret != 0)
771                 return NULL;
772         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
773
774         if (tempcdb == NULL) {
775                 lprintf(CTDL_EMERG,
776                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
777                 abort();
778         }
779
780         tempcdb->len = dret.size;
781         tempcdb->ptr = dret.data;
782 #ifdef HAVE_ZLIB
783         cdb_decompress_if_necessary(tempcdb);
784 #endif
785         return (tempcdb);
786 }
787
788
789 /*
790  * Free a cdbdata item.
791  *
792  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
793  * other code to assume ownership of that memory simply by storing the
794  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
795  * avoid freeing it.
796  */
797 void cdb_free(struct cdbdata *cdb)
798 {
799         if (cdb->ptr) {
800                 free(cdb->ptr);
801         }
802         free(cdb);
803 }
804
805 void cdb_close_cursor(int cdb)
806 {
807         if (MYCURSORS[cdb] != NULL)
808                 cclose(MYCURSORS[cdb]);
809
810         MYCURSORS[cdb] = NULL;
811 }
812
813 /* 
814  * Prepare for a sequential search of an entire database.
815  * (There is guaranteed to be no more than one traversal in
816  * progress per thread at any given time.)
817  */
818 void cdb_rewind(int cdb)
819 {
820         int ret = 0;
821
822         if (MYCURSORS[cdb] != NULL) {
823                 lprintf(CTDL_EMERG,
824                         "cdb_rewind: must close cursor on database %d before reopening.\n",
825                         cdb);
826                 abort();
827                 /* cclose(MYCURSORS[cdb]); */
828         }
829
830         /*
831          * Now initialize the cursor
832          */
833         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
834         if (ret) {
835                 lprintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
836                         db_strerror(ret));
837                 abort();
838         }
839 }
840
841
842 /*
843  * Fetch the next item in a sequential search.  Returns a pointer to a 
844  * cdbdata structure, or NULL if we've hit the end.
845  */
846 struct cdbdata *cdb_next_item(int cdb)
847 {
848         DBT key, data;
849         struct cdbdata *cdbret;
850         int ret = 0;
851
852         /* Initialize the key/data pair so the flags aren't set. */
853         memset(&key, 0, sizeof(key));
854         memset(&data, 0, sizeof(data));
855         data.flags = DB_DBT_MALLOC;
856
857         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
858
859         if (ret) {
860                 if (ret != DB_NOTFOUND) {
861                         lprintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
862                                 cdb, db_strerror(ret));
863                         abort();
864                 }
865                 cclose(MYCURSORS[cdb]);
866                 MYCURSORS[cdb] = NULL;
867                 return NULL;    /* presumably, end of file */
868         }
869
870         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
871         cdbret->len = data.size;
872         cdbret->ptr = data.data;
873 #ifdef HAVE_ZLIB
874         cdb_decompress_if_necessary(cdbret);
875 #endif
876
877         return (cdbret);
878 }
879
880
881
882 /*
883  * Transaction-based stuff.  I'm writing this as I bake cookies...
884  */
885
886 void cdb_begin_transaction(void)
887 {
888
889         bailIfCursor(MYCURSORS,
890                      "can't begin transaction during r/o cursor");
891
892         if (MYTID != NULL) {
893                 lprintf(CTDL_EMERG,
894                         "cdb_begin_transaction: ERROR: nested transaction\n");
895                 abort();
896         }
897
898         txbegin(&MYTID);
899 }
900
901 void cdb_end_transaction(void)
902 {
903         int i;
904
905         for (i = 0; i < MAXCDB; i++)
906                 if (MYCURSORS[i] != NULL) {
907                         lprintf(CTDL_WARNING,
908                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
909                                 i);
910                         cclose(MYCURSORS[i]);
911                         MYCURSORS[i] = NULL;
912                 }
913
914         if (MYTID == NULL) {
915                 lprintf(CTDL_EMERG,
916                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
917                 abort();
918         } else
919                 txcommit(MYTID);
920
921         MYTID = NULL;
922 }
923
924 /*
925  * Truncate (delete every record)
926  */
927 void cdb_trunc(int cdb)
928 {
929         /* DB_TXN *tid; */
930         int ret;
931         u_int32_t count;
932
933         if (MYTID != NULL) {
934                 lprintf(CTDL_EMERG,
935                         "cdb_trunc must not be called in a transaction.\n");
936                 abort();
937         } else {
938                 bailIfCursor(MYCURSORS,
939                              "attempt to write during r/o cursor");
940
941               retry:
942                 /* txbegin(&tid); */
943
944                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
945                                               NULL,     /* transaction ID */
946                                               &count,   /* #rows deleted */
947                                               0))) {    /* flags */
948                         if (ret == DB_LOCK_DEADLOCK) {
949                                 /* txabort(tid); */
950                                 goto retry;
951                         } else {
952                                 lprintf(CTDL_EMERG,
953                                         "cdb_truncate(%d): %s\n", cdb,
954                                         db_strerror(ret));
955                                 abort();
956                         }
957                 } else {
958                         /* txcommit(tid); */
959                 }
960         }
961 }