Remember the last version of Berkeley DB that touched the
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         CtdlLogPrintf(CTDL_DEBUG, "BDB: %s\n", msg);
73 }
74
75
76 /* Verbose logging callback */
77 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
78 {
79         CtdlLogPrintf(CTDL_ALERT, "BDB: %s\n", msg);
80 }
81
82
83 /* just a little helper function */
84 static void txabort(DB_TXN * tid)
85 {
86         int ret;
87
88         ret = tid->abort(tid);
89
90         if (ret) {
91                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
92                         db_strerror(ret));
93                 abort();
94         }
95 }
96
97 /* this one is even more helpful than the last. */
98 static void txcommit(DB_TXN * tid)
99 {
100         int ret;
101
102         ret = tid->commit(tid, 0);
103
104         if (ret) {
105                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
106                         db_strerror(ret));
107                 abort();
108         }
109 }
110
111 /* are you sensing a pattern yet? */
112 static void txbegin(DB_TXN ** tid)
113 {
114         int ret;
115
116         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
117
118         if (ret) {
119                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
120                         db_strerror(ret));
121                 abort();
122         }
123 }
124
125 static void dbpanic(DB_ENV * env, int errval)
126 {
127         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
128 }
129
130 static void cclose(DBC * cursor)
131 {
132         int ret;
133
134         if ((ret = cursor->c_close(cursor))) {
135                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
136                         db_strerror(ret));
137                 abort();
138         }
139 }
140
141 static void bailIfCursor(DBC ** cursors, const char *msg)
142 {
143         int i;
144
145         for (i = 0; i < MAXCDB; i++)
146                 if (cursors[i] != NULL) {
147                         CtdlLogPrintf(CTDL_EMERG,
148                                 "cdb_*: cursor still in progress on cdb %d: %s\n",
149                                 i, msg);
150                         abort();
151                 }
152 }
153
154 void check_handles(void *arg)
155 {
156         if (arg != NULL) {
157                 ThreadTSD *tsd = (ThreadTSD *) arg;
158
159                 bailIfCursor(tsd->cursors, "in check_handles");
160
161                 if (tsd->tid != NULL) {
162                         CtdlLogPrintf(CTDL_EMERG,
163                                 "cdb_*: transaction still in progress!");
164                         abort();
165                 }
166         }
167 }
168
169 void cdb_check_handles(void)
170 {
171         check_handles(pthread_getspecific(ThreadKey));
172 }
173
174
175 /*
176  * Cull the database logs
177  */
178 static void cdb_cull_logs(void)
179 {
180         u_int32_t flags;
181         int ret;
182         char **file, **list;
183         char errmsg[SIZ];
184
185         flags = DB_ARCH_ABS;
186
187         /* Get the list of names. */
188         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
189                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
190                 return;
191         }
192
193         /* Print the list of names. */
194         if (list != NULL) {
195                 for (file = list; *file != NULL; ++file) {
196                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
197                         ret = unlink(*file);
198                         if (ret != 0) {
199                                 snprintf(errmsg, sizeof(errmsg),
200                                          " ** ERROR **\n \n \n "
201                                          "Citadel was unable to delete the "
202                                          "database log file '%s' because of the "
203                                          "following error:\n \n %s\n \n"
204                                          " This log file is no longer in use "
205                                          "and may be safely deleted.\n",
206                                          *file, strerror(errno));
207                                 aide_message(errmsg, "Database Warning Message");
208                         }
209                 }
210                 free(list);
211         }
212 }
213
214 /*
215  * Manually initiate log file cull.
216  */
217 void cmd_cull(char *argbuf) {
218         if (CtdlAccessCheck(ac_internal)) return;
219         cdb_cull_logs();
220         cprintf("%d Database log file cull completed.\n", CIT_OK);
221 }
222
223
224 /*
225  * Request a checkpoint of the database.
226  */
227 void cdb_checkpoint(void)
228 {
229         int ret;
230 //      static time_t last_run = 0L;
231
232         /* Only do a checkpoint once per minute. */
233 /*
234  * Don't need this any more, since the thread that calls us sleeps for 60 seconds between calls
235  
236         if ((time(NULL) - last_run) < 60L) {
237                 return;
238         }
239         last_run = time(NULL);
240 */
241
242         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
243         ret = dbenv->txn_checkpoint(dbenv,
244                                     MAX_CHECKPOINT_KBYTES,
245                                     MAX_CHECKPOINT_MINUTES, 0);
246
247         if (ret != 0) {
248                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
249                         db_strerror(ret));
250                 abort();
251         }
252
253         /* After a successful checkpoint, we can cull the unused logs */
254         if (config.c_auto_cull) {
255                 cdb_cull_logs();
256         }
257 }
258
259
260
261 /*
262  * Open the various databases we'll be using.  Any database which
263  * does not exist should be created.  Note that we don't need a
264  * critical section here, because there aren't any active threads
265  * manipulating the database yet.
266  */
267 void open_databases(void)
268 {
269         int ret;
270         int i;
271         char dbfilename[SIZ];
272         u_int32_t flags = 0;
273         int dbversion_major, dbversion_minor, dbversion_patch;
274         int current_dbversion = 0;
275
276         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
277         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
278         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
279                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
280
281         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
282
283         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
284         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
285
286         if (CitControl.MMdbversion > current_dbversion) {
287                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
288                                         "of Berkeley DB that is older than that which last created or\n"
289                                         "updated the database.  Because this would probably cause data\n"
290                                         "corruption or loss, the server is aborting execution now.\n");
291                 exit(CTDLEXIT_DB);
292         }
293
294         CitControl.MMdbversion = current_dbversion;
295         put_control();
296
297 #ifdef HAVE_ZLIB
298         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
299 #endif
300
301         /*
302          * Silently try to create the database subdirectory.  If it's
303          * already there, no problem.
304          */
305         mkdir(ctdl_data_dir, 0700);
306         chmod(ctdl_data_dir, 0700);
307         chown(ctdl_data_dir, CTDLUID, (-1));
308
309         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
310         db_env_set_func_yield(sched_yield);
311         ret = db_env_create(&dbenv, 0);
312         if (ret) {
313                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
314                         db_strerror(ret));
315                 exit(CTDLEXIT_DB);
316         }
317         dbenv->set_errpfx(dbenv, "citserver");
318         dbenv->set_paniccall(dbenv, dbpanic);
319         dbenv->set_errcall(dbenv, cdb_verbose_err);
320         dbenv->set_errpfx(dbenv, "ctdl");
321 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
322         dbenv->set_msgcall(dbenv, cdb_verbose_log);
323 #endif
324         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
325         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
326
327         /*
328          * We want to specify the shared memory buffer pool cachesize,
329          * but everything else is the default.
330          */
331         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
332         if (ret) {
333                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
334                         db_strerror(ret));
335                 dbenv->close(dbenv, 0);
336                 exit(CTDLEXIT_DB);
337         }
338
339         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
340                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
341                         db_strerror(ret));
342                 dbenv->close(dbenv, 0);
343                 exit(CTDLEXIT_DB);
344         }
345
346         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
347         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
348         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
349         if (ret == DB_RUNRECOVERY) {
350                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
351                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
352                 flags |= DB_RECOVER;
353                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
354         }
355         if (ret == DB_RUNRECOVERY) {
356                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
357                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
358                 flags &= ~DB_RECOVER;
359                 flags |= DB_RECOVER_FATAL;
360                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
361         }
362         if (ret) {
363                 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open: %s\n", db_strerror(ret));
364                 dbenv->close(dbenv, 0);
365                 exit(CTDLEXIT_DB);
366         }
367
368         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
369
370         for (i = 0; i < MAXCDB; ++i) {
371
372                 /* Create a database handle */
373                 ret = db_create(&dbp[i], dbenv, 0);
374                 if (ret) {
375                         CtdlLogPrintf(CTDL_DEBUG, "db_create: %s\n",
376                                 db_strerror(ret));
377                         exit(CTDLEXIT_DB);
378                 }
379
380
381                 /* Arbitrary names for our tables -- we reference them by
382                  * number, so we don't have string names for them.
383                  */
384                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
385
386                 ret = dbp[i]->open(dbp[i],
387                                    NULL,
388                                    dbfilename,
389                                    NULL,
390                                    DB_BTREE,
391                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
392                                    0600);
393                 if (ret) {
394                         CtdlLogPrintf(CTDL_EMERG, "db_open[%d]: %s\n", i,
395                                 db_strerror(ret));
396                         exit(CTDLEXIT_DB);
397                 }
398         }
399
400 }
401
402
403 /* Make sure we own all the files, because in a few milliseconds
404  * we're going to drop root privs.
405  */
406 void cdb_chmod_data(void) {
407         DIR *dp;
408         struct dirent *d;
409         char filename[PATH_MAX];
410
411         dp = opendir(ctdl_data_dir);
412         if (dp != NULL) {
413                 while (d = readdir(dp), d != NULL) {
414                         if (d->d_name[0] != '.') {
415                                 snprintf(filename, sizeof filename,
416                                          "%s/%s", ctdl_data_dir, d->d_name);
417                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
418                                         filename, chmod(filename, 0600)
419                                 );
420                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
421                                         filename, chown(filename, CTDLUID, (-1))
422                                 );
423                         }
424                 }
425                 closedir(dp);
426         }
427
428         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
429
430         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
431 }
432
433
434 /*
435  * Close all of the db database files we've opened.  This can be done
436  * in a loop, since it's just a bunch of closes.
437  */
438 void close_databases(void)
439 {
440         int a;
441         int ret;
442
443         ctdl_thread_internal_free_tsd();
444         
445         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
446                 CtdlLogPrintf(CTDL_EMERG,
447                         "txn_checkpoint: %s\n", db_strerror(ret));
448         }
449
450         /* print some statistics... */
451 #ifdef DB_STAT_ALL
452         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
453 #endif
454
455         /* close the tables */
456         for (a = 0; a < MAXCDB; ++a) {
457                 CtdlLogPrintf(CTDL_INFO, "Closing database %d\n", a);
458                 ret = dbp[a]->close(dbp[a], 0);
459                 if (ret) {
460                         CtdlLogPrintf(CTDL_EMERG,
461                                 "db_close: %s\n", db_strerror(ret));
462                 }
463
464         }
465
466         /* Close the handle. */
467         ret = dbenv->close(dbenv, 0);
468         if (ret) {
469                 CtdlLogPrintf(CTDL_EMERG,
470                         "DBENV->close: %s\n", db_strerror(ret));
471         }
472 }
473
474
475 /*
476  * Compression functions only used if we have zlib
477  */
478 #ifdef HAVE_ZLIB
479
480 void cdb_decompress_if_necessary(struct cdbdata *cdb)
481 {
482         static int magic = COMPRESS_MAGIC;
483         struct CtdlCompressHeader zheader;
484         char *uncompressed_data;
485         char *compressed_data;
486         uLongf destLen, sourceLen;
487
488         if (cdb == NULL)
489                 return;
490         if (cdb->ptr == NULL)
491                 return;
492         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
493                 return;
494
495         /* At this point we know we're looking at a compressed item. */
496         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
497
498         compressed_data = cdb->ptr;
499         compressed_data += sizeof(struct CtdlCompressHeader);
500
501         sourceLen = (uLongf) zheader.compressed_len;
502         destLen = (uLongf) zheader.uncompressed_len;
503         uncompressed_data = malloc(zheader.uncompressed_len);
504
505         if (uncompress((Bytef *) uncompressed_data,
506                        (uLongf *) & destLen,
507                        (const Bytef *) compressed_data,
508                        (uLong) sourceLen) != Z_OK) {
509                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
510                 abort();
511         }
512
513         free(cdb->ptr);
514         cdb->len = (size_t) destLen;
515         cdb->ptr = uncompressed_data;
516 }
517
518 #endif                          /* HAVE_ZLIB */
519
520
521 /*
522  * Store a piece of data.  Returns 0 if the operation was successful.  If a
523  * key already exists it should be overwritten.
524  */
525 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
526 {
527
528         DBT dkey, ddata;
529         DB_TXN *tid;
530         int ret = 0;
531
532 #ifdef HAVE_ZLIB
533         struct CtdlCompressHeader zheader;
534         char *compressed_data = NULL;
535         int compressing = 0;
536         size_t buffer_len = 0;
537         uLongf destLen = 0;
538 #endif
539
540         memset(&dkey, 0, sizeof(DBT));
541         memset(&ddata, 0, sizeof(DBT));
542         dkey.size = ckeylen;
543         dkey.data = ckey;
544         ddata.size = cdatalen;
545         ddata.data = cdata;
546
547 #ifdef HAVE_ZLIB
548         /* Only compress Visit records.  Everything else is uncompressed. */
549         if (cdb == CDB_VISIT) {
550                 compressing = 1;
551                 zheader.magic = COMPRESS_MAGIC;
552                 zheader.uncompressed_len = cdatalen;
553                 buffer_len = ((cdatalen * 101) / 100) + 100
554                     + sizeof(struct CtdlCompressHeader);
555                 destLen = (uLongf) buffer_len;
556                 compressed_data = malloc(buffer_len);
557                 if (compress2((Bytef *) (compressed_data +
558                                          sizeof(struct
559                                                 CtdlCompressHeader)),
560                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
561                               1) != Z_OK) {
562                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
563                         abort();
564                 }
565                 zheader.compressed_len = (size_t) destLen;
566                 memcpy(compressed_data, &zheader,
567                        sizeof(struct CtdlCompressHeader));
568                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
569                                        zheader.compressed_len);
570                 ddata.data = compressed_data;
571         }
572 #endif
573
574         if (MYTID != NULL) {
575                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
576                                     MYTID,      /* transaction ID */
577                                     &dkey,      /* key */
578                                     &ddata,     /* data */
579                                     0); /* flags */
580                 if (ret) {
581                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
582                                 db_strerror(ret));
583                         abort();
584                 }
585 #ifdef HAVE_ZLIB
586                 if (compressing)
587                         free(compressed_data);
588 #endif
589                 return ret;
590
591         } else {
592                 bailIfCursor(MYCURSORS,
593                              "attempt to write during r/o cursor");
594
595               retry:
596                 txbegin(&tid);
597
598                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
599                                          tid,   /* transaction ID */
600                                          &dkey, /* key */
601                                          &ddata,        /* data */
602                                          0))) { /* flags */
603                         if (ret == DB_LOCK_DEADLOCK) {
604                                 txabort(tid);
605                                 goto retry;
606                         } else {
607                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
608                                         cdb, db_strerror(ret));
609                                 abort();
610                         }
611                 } else {
612                         txcommit(tid);
613 #ifdef HAVE_ZLIB
614                         if (compressing)
615                                 free(compressed_data);
616 #endif
617                         return ret;
618                 }
619         }
620 }
621
622
623 /*
624  * Delete a piece of data.  Returns 0 if the operation was successful.
625  */
626 int cdb_delete(int cdb, void *key, int keylen)
627 {
628
629         DBT dkey;
630         DB_TXN *tid;
631         int ret;
632
633         memset(&dkey, 0, sizeof dkey);
634         dkey.size = keylen;
635         dkey.data = key;
636
637         if (MYTID != NULL) {
638                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
639                 if (ret) {
640                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
641                                 db_strerror(ret));
642                         if (ret != DB_NOTFOUND)
643                                 abort();
644                 }
645         } else {
646                 bailIfCursor(MYCURSORS,
647                              "attempt to delete during r/o cursor");
648
649               retry:
650                 txbegin(&tid);
651
652                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
653                     && ret != DB_NOTFOUND) {
654                         if (ret == DB_LOCK_DEADLOCK) {
655                                 txabort(tid);
656                                 goto retry;
657                         } else {
658                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
659                                         cdb, db_strerror(ret));
660                                 abort();
661                         }
662                 } else {
663                         txcommit(tid);
664                 }
665         }
666         return ret;
667 }
668
669 static DBC *localcursor(int cdb)
670 {
671         int ret;
672         DBC *curs;
673
674         if (MYCURSORS[cdb] == NULL)
675                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
676         else
677                 ret =
678                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
679                                           DB_POSITION);
680
681         if (ret) {
682                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
683                 abort();
684         }
685
686         return curs;
687 }
688
689
690 /*
691  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
692  * a struct cdbdata which it is the caller's responsibility to free later on
693  * using the cdb_free() routine.
694  */
695 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
696 {
697
698         struct cdbdata *tempcdb;
699         DBT dkey, dret;
700         int ret;
701
702         memset(&dkey, 0, sizeof(DBT));
703         dkey.size = keylen;
704         dkey.data = key;
705
706         if (MYTID != NULL) {
707                 memset(&dret, 0, sizeof(DBT));
708                 dret.flags = DB_DBT_MALLOC;
709                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
710         } else {
711                 DBC *curs;
712
713                 do {
714                         memset(&dret, 0, sizeof(DBT));
715                         dret.flags = DB_DBT_MALLOC;
716
717                         curs = localcursor(cdb);
718
719                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
720                         cclose(curs);
721                 }
722                 while (ret == DB_LOCK_DEADLOCK);
723
724         }
725
726         if ((ret != 0) && (ret != DB_NOTFOUND)) {
727                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
728                         db_strerror(ret));
729                 abort();
730         }
731
732         if (ret != 0)
733                 return NULL;
734         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
735
736         if (tempcdb == NULL) {
737                 CtdlLogPrintf(CTDL_EMERG,
738                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
739                 abort();
740         }
741
742         tempcdb->len = dret.size;
743         tempcdb->ptr = dret.data;
744 #ifdef HAVE_ZLIB
745         cdb_decompress_if_necessary(tempcdb);
746 #endif
747         return (tempcdb);
748 }
749
750
751 /*
752  * Free a cdbdata item.
753  *
754  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
755  * other code to assume ownership of that memory simply by storing the
756  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
757  * avoid freeing it.
758  */
759 void cdb_free(struct cdbdata *cdb)
760 {
761         if (cdb->ptr) {
762                 free(cdb->ptr);
763         }
764         free(cdb);
765 }
766
767 void cdb_close_cursor(int cdb)
768 {
769         if (MYCURSORS[cdb] != NULL)
770                 cclose(MYCURSORS[cdb]);
771
772         MYCURSORS[cdb] = NULL;
773 }
774
775 /* 
776  * Prepare for a sequential search of an entire database.
777  * (There is guaranteed to be no more than one traversal in
778  * progress per thread at any given time.)
779  */
780 void cdb_rewind(int cdb)
781 {
782         int ret = 0;
783
784         if (MYCURSORS[cdb] != NULL) {
785                 CtdlLogPrintf(CTDL_EMERG,
786                         "cdb_rewind: must close cursor on database %d before reopening.\n",
787                         cdb);
788                 abort();
789                 /* cclose(MYCURSORS[cdb]); */
790         }
791
792         /*
793          * Now initialize the cursor
794          */
795         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
796         if (ret) {
797                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
798                         db_strerror(ret));
799                 abort();
800         }
801 }
802
803
804 /*
805  * Fetch the next item in a sequential search.  Returns a pointer to a 
806  * cdbdata structure, or NULL if we've hit the end.
807  */
808 struct cdbdata *cdb_next_item(int cdb)
809 {
810         DBT key, data;
811         struct cdbdata *cdbret;
812         int ret = 0;
813
814         /* Initialize the key/data pair so the flags aren't set. */
815         memset(&key, 0, sizeof(key));
816         memset(&data, 0, sizeof(data));
817         data.flags = DB_DBT_MALLOC;
818
819         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
820
821         if (ret) {
822                 if (ret != DB_NOTFOUND) {
823                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
824                                 cdb, db_strerror(ret));
825                         abort();
826                 }
827                 cclose(MYCURSORS[cdb]);
828                 MYCURSORS[cdb] = NULL;
829                 return NULL;    /* presumably, end of file */
830         }
831
832         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
833         cdbret->len = data.size;
834         cdbret->ptr = data.data;
835 #ifdef HAVE_ZLIB
836         cdb_decompress_if_necessary(cdbret);
837 #endif
838
839         return (cdbret);
840 }
841
842
843
844 /*
845  * Transaction-based stuff.  I'm writing this as I bake cookies...
846  */
847
848 void cdb_begin_transaction(void)
849 {
850
851         bailIfCursor(MYCURSORS,
852                      "can't begin transaction during r/o cursor");
853
854         if (MYTID != NULL) {
855                 CtdlLogPrintf(CTDL_EMERG,
856                         "cdb_begin_transaction: ERROR: nested transaction\n");
857                 abort();
858         }
859
860         txbegin(&MYTID);
861 }
862
863 void cdb_end_transaction(void)
864 {
865         int i;
866
867         for (i = 0; i < MAXCDB; i++)
868                 if (MYCURSORS[i] != NULL) {
869                         CtdlLogPrintf(CTDL_WARNING,
870                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
871                                 i);
872                         cclose(MYCURSORS[i]);
873                         MYCURSORS[i] = NULL;
874                 }
875
876         if (MYTID == NULL) {
877                 CtdlLogPrintf(CTDL_EMERG,
878                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
879                 abort();
880         } else
881                 txcommit(MYTID);
882
883         MYTID = NULL;
884 }
885
886 /*
887  * Truncate (delete every record)
888  */
889 void cdb_trunc(int cdb)
890 {
891         /* DB_TXN *tid; */
892         int ret;
893         u_int32_t count;
894
895         if (MYTID != NULL) {
896                 CtdlLogPrintf(CTDL_EMERG,
897                         "cdb_trunc must not be called in a transaction.\n");
898                 abort();
899         } else {
900                 bailIfCursor(MYCURSORS,
901                              "attempt to write during r/o cursor");
902
903               retry:
904                 /* txbegin(&tid); */
905
906                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
907                                               NULL,     /* transaction ID */
908                                               &count,   /* #rows deleted */
909                                               0))) {    /* flags */
910                         if (ret == DB_LOCK_DEADLOCK) {
911                                 /* txabort(tid); */
912                                 goto retry;
913                         } else {
914                                 CtdlLogPrintf(CTDL_EMERG,
915                                         "cdb_truncate(%d): %s\n", cdb,
916                                         db_strerror(ret));
917                                 abort();
918                         }
919                 } else {
920                         /* txcommit(tid); */
921                 }
922         }
923 }