* database.c: removed some code that had been commented out for a long time
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
94                         db_strerror(ret));
95                 abort();
96         }
97 }
98
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->commit(tid, 0);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
108                         db_strerror(ret));
109                 abort();
110         }
111 }
112
113 /* are you sensing a pattern yet? */
114 static void txbegin(DB_TXN ** tid)
115 {
116         int ret;
117
118         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
119
120         if (ret) {
121                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
122                         db_strerror(ret));
123                 abort();
124         }
125 }
126
127 static void dbpanic(DB_ENV * env, int errval)
128 {
129         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
130 }
131
132 static void cclose(DBC * cursor)
133 {
134         int ret;
135
136         if ((ret = cursor->c_close(cursor))) {
137                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
138                         db_strerror(ret));
139                 abort();
140         }
141 }
142
143 static void bailIfCursor(DBC ** cursors, const char *msg)
144 {
145         int i;
146
147         for (i = 0; i < MAXCDB; i++)
148                 if (cursors[i] != NULL) {
149                         CtdlLogPrintf(CTDL_EMERG,
150                                 "cdb_*: cursor still in progress on cdb %02x: %s\n",
151                                 i, msg);
152                         abort();
153                 }
154 }
155
156 void check_handles(void *arg)
157 {
158         if (arg != NULL) {
159                 ThreadTSD *tsd = (ThreadTSD *) arg;
160
161                 bailIfCursor(tsd->cursors, "in check_handles");
162
163                 if (tsd->tid != NULL) {
164                         CtdlLogPrintf(CTDL_EMERG,
165                                 "cdb_*: transaction still in progress!");
166                         abort();
167                 }
168         }
169 }
170
171 void cdb_check_handles(void)
172 {
173         check_handles(pthread_getspecific(ThreadKey));
174 }
175
176
177 /*
178  * Cull the database logs
179  */
180 static void cdb_cull_logs(void)
181 {
182         u_int32_t flags;
183         int ret;
184         char **file, **list;
185         char errmsg[SIZ];
186
187         flags = DB_ARCH_ABS;
188
189         /* Get the list of names. */
190         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
191                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
192                 return;
193         }
194
195         /* Print the list of names. */
196         if (list != NULL) {
197                 for (file = list; *file != NULL; ++file) {
198                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
199                         ret = unlink(*file);
200                         if (ret != 0) {
201                                 snprintf(errmsg, sizeof(errmsg),
202                                          " ** ERROR **\n \n \n "
203                                          "Citadel was unable to delete the "
204                                          "database log file '%s' because of the "
205                                          "following error:\n \n %s\n \n"
206                                          " This log file is no longer in use "
207                                          "and may be safely deleted.\n",
208                                          *file, strerror(errno));
209                                 aide_message(errmsg, "Database Warning Message");
210                         }
211                 }
212                 free(list);
213         }
214 }
215
216 /*
217  * Manually initiate log file cull.
218  */
219 void cmd_cull(char *argbuf) {
220         if (CtdlAccessCheck(ac_internal)) return;
221         cdb_cull_logs();
222         cprintf("%d Database log file cull completed.\n", CIT_OK);
223 }
224
225
226 /*
227  * Request a checkpoint of the database.  Called once per minute by the thread manager.
228  */
229 void cdb_checkpoint(void)
230 {
231         int ret;
232
233         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
234         ret = dbenv->txn_checkpoint(dbenv,
235                                     MAX_CHECKPOINT_KBYTES,
236                                     MAX_CHECKPOINT_MINUTES, 0);
237
238         if (ret != 0) {
239                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
240                         db_strerror(ret));
241                 abort();
242         }
243
244         /* After a successful checkpoint, we can cull the unused logs */
245         if (config.c_auto_cull) {
246                 cdb_cull_logs();
247         }
248 }
249
250
251
252 /*
253  * Open the various databases we'll be using.  Any database which
254  * does not exist should be created.  Note that we don't need a
255  * critical section here, because there aren't any active threads
256  * manipulating the database yet.
257  */
258 void open_databases(void)
259 {
260         int ret;
261         int i;
262         char dbfilename[32];
263         u_int32_t flags = 0;
264         int dbversion_major, dbversion_minor, dbversion_patch;
265         int current_dbversion = 0;
266
267         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
268         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
269         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
270                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
271
272         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
273
274         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
275         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
276
277         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
278            && (CitControl.MMdbversion > current_dbversion) ) {
279                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
280                                         "of Berkeley DB that is older than that which last created or\n"
281                                         "updated the database.  Because this would probably cause data\n"
282                                         "corruption or loss, the server is aborting execution now.\n");
283                 exit(CTDLEXIT_DB);
284         }
285
286         CitControl.MMdbversion = current_dbversion;
287         put_control();
288
289 #ifdef HAVE_ZLIB
290         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
291 #endif
292
293         /*
294          * Silently try to create the database subdirectory.  If it's
295          * already there, no problem.
296          */
297         mkdir(ctdl_data_dir, 0700);
298         chmod(ctdl_data_dir, 0700);
299         chown(ctdl_data_dir, CTDLUID, (-1));
300
301         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
302         db_env_set_func_yield(sched_yield);
303         ret = db_env_create(&dbenv, 0);
304         if (ret) {
305                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
306                         db_strerror(ret));
307                 exit(CTDLEXIT_DB);
308         }
309         dbenv->set_errpfx(dbenv, "citserver");
310         dbenv->set_paniccall(dbenv, dbpanic);
311         dbenv->set_errcall(dbenv, cdb_verbose_err);
312         dbenv->set_errpfx(dbenv, "ctdl");
313 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
314         dbenv->set_msgcall(dbenv, cdb_verbose_log);
315 #endif
316         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
317         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
318
319         /*
320          * We want to specify the shared memory buffer pool cachesize,
321          * but everything else is the default.
322          */
323         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
324         if (ret) {
325                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
326                         db_strerror(ret));
327                 dbenv->close(dbenv, 0);
328                 exit(CTDLEXIT_DB);
329         }
330
331         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
332                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
333                         db_strerror(ret));
334                 dbenv->close(dbenv, 0);
335                 exit(CTDLEXIT_DB);
336         }
337
338         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
339         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
340         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
341         if (ret == DB_RUNRECOVERY) {
342                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
343                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
344                 flags |= DB_RECOVER;
345                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
346         }
347         if (ret == DB_RUNRECOVERY) {
348                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
349                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
350                 flags &= ~DB_RECOVER;
351                 flags |= DB_RECOVER_FATAL;
352                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
353         }
354         if (ret) {
355                 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open: %s\n", db_strerror(ret));
356                 dbenv->close(dbenv, 0);
357                 exit(CTDLEXIT_DB);
358         }
359
360         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
361
362         for (i = 0; i < MAXCDB; ++i) {
363
364                 /* Create a database handle */
365                 ret = db_create(&dbp[i], dbenv, 0);
366                 if (ret) {
367                         CtdlLogPrintf(CTDL_DEBUG, "db_create: %s\n",
368                                 db_strerror(ret));
369                         exit(CTDLEXIT_DB);
370                 }
371
372
373                 /* Arbitrary names for our tables -- we reference them by
374                  * number, so we don't have string names for them.
375                  */
376                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
377
378                 ret = dbp[i]->open(dbp[i],
379                                    NULL,
380                                    dbfilename,
381                                    NULL,
382                                    DB_BTREE,
383                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
384                                    0600);
385                 if (ret) {
386                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i,
387                                 db_strerror(ret));
388                         exit(CTDLEXIT_DB);
389                 }
390         }
391
392 }
393
394
395 /* Make sure we own all the files, because in a few milliseconds
396  * we're going to drop root privs.
397  */
398 void cdb_chmod_data(void) {
399         DIR *dp;
400         struct dirent *d;
401         char filename[PATH_MAX];
402
403         dp = opendir(ctdl_data_dir);
404         if (dp != NULL) {
405                 while (d = readdir(dp), d != NULL) {
406                         if (d->d_name[0] != '.') {
407                                 snprintf(filename, sizeof filename,
408                                          "%s/%s", ctdl_data_dir, d->d_name);
409                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
410                                         filename, chmod(filename, 0600)
411                                 );
412                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
413                                         filename, chown(filename, CTDLUID, (-1))
414                                 );
415                         }
416                 }
417                 closedir(dp);
418         }
419
420         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
421
422         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
423 }
424
425
426 /*
427  * Close all of the db database files we've opened.  This can be done
428  * in a loop, since it's just a bunch of closes.
429  */
430 void close_databases(void)
431 {
432         int a;
433         int ret;
434
435         ctdl_thread_internal_free_tsd();
436         
437         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
438                 CtdlLogPrintf(CTDL_EMERG,
439                         "txn_checkpoint: %s\n", db_strerror(ret));
440         }
441
442         /* print some statistics... */
443 #ifdef DB_STAT_ALL
444         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
445 #endif
446
447         /* close the tables */
448         for (a = 0; a < MAXCDB; ++a) {
449                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
450                 ret = dbp[a]->close(dbp[a], 0);
451                 if (ret) {
452                         CtdlLogPrintf(CTDL_EMERG,
453                                 "db_close: %s\n", db_strerror(ret));
454                 }
455
456         }
457
458         /* Close the handle. */
459         ret = dbenv->close(dbenv, 0);
460         if (ret) {
461                 CtdlLogPrintf(CTDL_EMERG,
462                         "DBENV->close: %s\n", db_strerror(ret));
463         }
464 }
465
466
467 /*
468  * Compression functions only used if we have zlib
469  */
470 #ifdef HAVE_ZLIB
471
472 void cdb_decompress_if_necessary(struct cdbdata *cdb)
473 {
474         static int magic = COMPRESS_MAGIC;
475         struct CtdlCompressHeader zheader;
476         char *uncompressed_data;
477         char *compressed_data;
478         uLongf destLen, sourceLen;
479
480         if (cdb == NULL)
481                 return;
482         if (cdb->ptr == NULL)
483                 return;
484         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
485                 return;
486
487         /* At this point we know we're looking at a compressed item. */
488         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
489
490         compressed_data = cdb->ptr;
491         compressed_data += sizeof(struct CtdlCompressHeader);
492
493         sourceLen = (uLongf) zheader.compressed_len;
494         destLen = (uLongf) zheader.uncompressed_len;
495         uncompressed_data = malloc(zheader.uncompressed_len);
496
497         if (uncompress((Bytef *) uncompressed_data,
498                        (uLongf *) & destLen,
499                        (const Bytef *) compressed_data,
500                        (uLong) sourceLen) != Z_OK) {
501                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
502                 abort();
503         }
504
505         free(cdb->ptr);
506         cdb->len = (size_t) destLen;
507         cdb->ptr = uncompressed_data;
508 }
509
510 #endif                          /* HAVE_ZLIB */
511
512
513 /*
514  * Store a piece of data.  Returns 0 if the operation was successful.  If a
515  * key already exists it should be overwritten.
516  */
517 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
518 {
519
520         DBT dkey, ddata;
521         DB_TXN *tid;
522         int ret = 0;
523
524 #ifdef HAVE_ZLIB
525         struct CtdlCompressHeader zheader;
526         char *compressed_data = NULL;
527         int compressing = 0;
528         size_t buffer_len = 0;
529         uLongf destLen = 0;
530 #endif
531
532         memset(&dkey, 0, sizeof(DBT));
533         memset(&ddata, 0, sizeof(DBT));
534         dkey.size = ckeylen;
535         dkey.data = ckey;
536         ddata.size = cdatalen;
537         ddata.data = cdata;
538
539 #ifdef HAVE_ZLIB
540         /* Only compress Visit records.  Everything else is uncompressed. */
541         if (cdb == CDB_VISIT) {
542                 compressing = 1;
543                 zheader.magic = COMPRESS_MAGIC;
544                 zheader.uncompressed_len = cdatalen;
545                 buffer_len = ((cdatalen * 101) / 100) + 100
546                     + sizeof(struct CtdlCompressHeader);
547                 destLen = (uLongf) buffer_len;
548                 compressed_data = malloc(buffer_len);
549                 if (compress2((Bytef *) (compressed_data +
550                                          sizeof(struct
551                                                 CtdlCompressHeader)),
552                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
553                               1) != Z_OK) {
554                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
555                         abort();
556                 }
557                 zheader.compressed_len = (size_t) destLen;
558                 memcpy(compressed_data, &zheader,
559                        sizeof(struct CtdlCompressHeader));
560                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
561                                        zheader.compressed_len);
562                 ddata.data = compressed_data;
563         }
564 #endif
565
566         if (MYTID != NULL) {
567                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
568                                     MYTID,      /* transaction ID */
569                                     &dkey,      /* key */
570                                     &ddata,     /* data */
571                                     0); /* flags */
572                 if (ret) {
573                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
574                                 db_strerror(ret));
575                         abort();
576                 }
577 #ifdef HAVE_ZLIB
578                 if (compressing)
579                         free(compressed_data);
580 #endif
581                 return ret;
582
583         } else {
584                 bailIfCursor(MYCURSORS,
585                              "attempt to write during r/o cursor");
586
587               retry:
588                 txbegin(&tid);
589
590                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
591                                          tid,   /* transaction ID */
592                                          &dkey, /* key */
593                                          &ddata,        /* data */
594                                          0))) { /* flags */
595                         if (ret == DB_LOCK_DEADLOCK) {
596                                 txabort(tid);
597                                 goto retry;
598                         } else {
599                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
600                                         cdb, db_strerror(ret));
601                                 abort();
602                         }
603                 } else {
604                         txcommit(tid);
605 #ifdef HAVE_ZLIB
606                         if (compressing)
607                                 free(compressed_data);
608 #endif
609                         return ret;
610                 }
611         }
612 }
613
614
615 /*
616  * Delete a piece of data.  Returns 0 if the operation was successful.
617  */
618 int cdb_delete(int cdb, void *key, int keylen)
619 {
620
621         DBT dkey;
622         DB_TXN *tid;
623         int ret;
624
625         memset(&dkey, 0, sizeof dkey);
626         dkey.size = keylen;
627         dkey.data = key;
628
629         if (MYTID != NULL) {
630                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
631                 if (ret) {
632                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
633                                 db_strerror(ret));
634                         if (ret != DB_NOTFOUND)
635                                 abort();
636                 }
637         } else {
638                 bailIfCursor(MYCURSORS,
639                              "attempt to delete during r/o cursor");
640
641               retry:
642                 txbegin(&tid);
643
644                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
645                     && ret != DB_NOTFOUND) {
646                         if (ret == DB_LOCK_DEADLOCK) {
647                                 txabort(tid);
648                                 goto retry;
649                         } else {
650                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
651                                         cdb, db_strerror(ret));
652                                 abort();
653                         }
654                 } else {
655                         txcommit(tid);
656                 }
657         }
658         return ret;
659 }
660
661 static DBC *localcursor(int cdb)
662 {
663         int ret;
664         DBC *curs;
665
666         if (MYCURSORS[cdb] == NULL)
667                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
668         else
669                 ret =
670                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
671                                           DB_POSITION);
672
673         if (ret) {
674                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
675                 abort();
676         }
677
678         return curs;
679 }
680
681
682 /*
683  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
684  * a struct cdbdata which it is the caller's responsibility to free later on
685  * using the cdb_free() routine.
686  */
687 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
688 {
689
690         struct cdbdata *tempcdb;
691         DBT dkey, dret;
692         int ret;
693
694         memset(&dkey, 0, sizeof(DBT));
695         dkey.size = keylen;
696         dkey.data = key;
697
698         if (MYTID != NULL) {
699                 memset(&dret, 0, sizeof(DBT));
700                 dret.flags = DB_DBT_MALLOC;
701                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
702         } else {
703                 DBC *curs;
704
705                 do {
706                         memset(&dret, 0, sizeof(DBT));
707                         dret.flags = DB_DBT_MALLOC;
708
709                         curs = localcursor(cdb);
710
711                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
712                         cclose(curs);
713                 }
714                 while (ret == DB_LOCK_DEADLOCK);
715
716         }
717
718         if ((ret != 0) && (ret != DB_NOTFOUND)) {
719                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
720                         db_strerror(ret));
721                 abort();
722         }
723
724         if (ret != 0)
725                 return NULL;
726         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
727
728         if (tempcdb == NULL) {
729                 CtdlLogPrintf(CTDL_EMERG,
730                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
731                 abort();
732         }
733
734         tempcdb->len = dret.size;
735         tempcdb->ptr = dret.data;
736 #ifdef HAVE_ZLIB
737         cdb_decompress_if_necessary(tempcdb);
738 #endif
739         return (tempcdb);
740 }
741
742
743 /*
744  * Free a cdbdata item.
745  *
746  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
747  * other code to assume ownership of that memory simply by storing the
748  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
749  * avoid freeing it.
750  */
751 void cdb_free(struct cdbdata *cdb)
752 {
753         if (cdb->ptr) {
754                 free(cdb->ptr);
755         }
756         free(cdb);
757 }
758
759 void cdb_close_cursor(int cdb)
760 {
761         if (MYCURSORS[cdb] != NULL)
762                 cclose(MYCURSORS[cdb]);
763
764         MYCURSORS[cdb] = NULL;
765 }
766
767 /* 
768  * Prepare for a sequential search of an entire database.
769  * (There is guaranteed to be no more than one traversal in
770  * progress per thread at any given time.)
771  */
772 void cdb_rewind(int cdb)
773 {
774         int ret = 0;
775
776         if (MYCURSORS[cdb] != NULL) {
777                 CtdlLogPrintf(CTDL_EMERG,
778                         "cdb_rewind: must close cursor on database %d before reopening.\n",
779                         cdb);
780                 abort();
781                 /* cclose(MYCURSORS[cdb]); */
782         }
783
784         /*
785          * Now initialize the cursor
786          */
787         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
788         if (ret) {
789                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
790                         db_strerror(ret));
791                 abort();
792         }
793 }
794
795
796 /*
797  * Fetch the next item in a sequential search.  Returns a pointer to a 
798  * cdbdata structure, or NULL if we've hit the end.
799  */
800 struct cdbdata *cdb_next_item(int cdb)
801 {
802         DBT key, data;
803         struct cdbdata *cdbret;
804         int ret = 0;
805
806         /* Initialize the key/data pair so the flags aren't set. */
807         memset(&key, 0, sizeof(key));
808         memset(&data, 0, sizeof(data));
809         data.flags = DB_DBT_MALLOC;
810
811         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
812
813         if (ret) {
814                 if (ret != DB_NOTFOUND) {
815                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
816                                 cdb, db_strerror(ret));
817                         abort();
818                 }
819                 cclose(MYCURSORS[cdb]);
820                 MYCURSORS[cdb] = NULL;
821                 return NULL;    /* presumably, end of file */
822         }
823
824         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
825         cdbret->len = data.size;
826         cdbret->ptr = data.data;
827 #ifdef HAVE_ZLIB
828         cdb_decompress_if_necessary(cdbret);
829 #endif
830
831         return (cdbret);
832 }
833
834
835
836 /*
837  * Transaction-based stuff.  I'm writing this as I bake cookies...
838  */
839
840 void cdb_begin_transaction(void)
841 {
842
843         bailIfCursor(MYCURSORS,
844                      "can't begin transaction during r/o cursor");
845
846         if (MYTID != NULL) {
847                 CtdlLogPrintf(CTDL_EMERG,
848                         "cdb_begin_transaction: ERROR: nested transaction\n");
849                 abort();
850         }
851
852         txbegin(&MYTID);
853 }
854
855 void cdb_end_transaction(void)
856 {
857         int i;
858
859         for (i = 0; i < MAXCDB; i++)
860                 if (MYCURSORS[i] != NULL) {
861                         CtdlLogPrintf(CTDL_WARNING,
862                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
863                                 i);
864                         cclose(MYCURSORS[i]);
865                         MYCURSORS[i] = NULL;
866                 }
867
868         if (MYTID == NULL) {
869                 CtdlLogPrintf(CTDL_EMERG,
870                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
871                 abort();
872         } else
873                 txcommit(MYTID);
874
875         MYTID = NULL;
876 }
877
878 /*
879  * Truncate (delete every record)
880  */
881 void cdb_trunc(int cdb)
882 {
883         /* DB_TXN *tid; */
884         int ret;
885         u_int32_t count;
886
887         if (MYTID != NULL) {
888                 CtdlLogPrintf(CTDL_EMERG,
889                         "cdb_trunc must not be called in a transaction.\n");
890                 abort();
891         } else {
892                 bailIfCursor(MYCURSORS,
893                              "attempt to write during r/o cursor");
894
895               retry:
896                 /* txbegin(&tid); */
897
898                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
899                                               NULL,     /* transaction ID */
900                                               &count,   /* #rows deleted */
901                                               0))) {    /* flags */
902                         if (ret == DB_LOCK_DEADLOCK) {
903                                 /* txabort(tid); */
904                                 goto retry;
905                         } else {
906                                 CtdlLogPrintf(CTDL_EMERG,
907                                         "cdb_truncate(%d): %s\n", cdb,
908                                         db_strerror(ret));
909                                 abort();
910                         }
911                 } else {
912                         /* txcommit(tid); */
913                 }
914         }
915 }