Renamed database_sleepycat.c to database.c
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         CtdlLogPrintf(CTDL_DEBUG, "BDB: %s\n", msg);
73 }
74
75
76 /* Verbose logging callback */
77 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
78 {
79         CtdlLogPrintf(CTDL_ALERT, "BDB: %s\n", msg);
80 }
81
82
83 /* just a little helper function */
84 static void txabort(DB_TXN * tid)
85 {
86         int ret;
87
88         ret = tid->abort(tid);
89
90         if (ret) {
91                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
92                         db_strerror(ret));
93                 abort();
94         }
95 }
96
97 /* this one is even more helpful than the last. */
98 static void txcommit(DB_TXN * tid)
99 {
100         int ret;
101
102         ret = tid->commit(tid, 0);
103
104         if (ret) {
105                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
106                         db_strerror(ret));
107                 abort();
108         }
109 }
110
111 /* are you sensing a pattern yet? */
112 static void txbegin(DB_TXN ** tid)
113 {
114         int ret;
115
116         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
117
118         if (ret) {
119                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
120                         db_strerror(ret));
121                 abort();
122         }
123 }
124
125 static void dbpanic(DB_ENV * env, int errval)
126 {
127         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
128 }
129
130 static void cclose(DBC * cursor)
131 {
132         int ret;
133
134         if ((ret = cursor->c_close(cursor))) {
135                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
136                         db_strerror(ret));
137                 abort();
138         }
139 }
140
141 static void bailIfCursor(DBC ** cursors, const char *msg)
142 {
143         int i;
144
145         for (i = 0; i < MAXCDB; i++)
146                 if (cursors[i] != NULL) {
147                         CtdlLogPrintf(CTDL_EMERG,
148                                 "cdb_*: cursor still in progress on cdb %02x: %s\n",
149                                 i, msg);
150                         abort();
151                 }
152 }
153
154 void check_handles(void *arg)
155 {
156         if (arg != NULL) {
157                 ThreadTSD *tsd = (ThreadTSD *) arg;
158
159                 bailIfCursor(tsd->cursors, "in check_handles");
160
161                 if (tsd->tid != NULL) {
162                         CtdlLogPrintf(CTDL_EMERG,
163                                 "cdb_*: transaction still in progress!");
164                         abort();
165                 }
166         }
167 }
168
169 void cdb_check_handles(void)
170 {
171         check_handles(pthread_getspecific(ThreadKey));
172 }
173
174
175 /*
176  * Cull the database logs
177  */
178 static void cdb_cull_logs(void)
179 {
180         u_int32_t flags;
181         int ret;
182         char **file, **list;
183         char errmsg[SIZ];
184
185         flags = DB_ARCH_ABS;
186
187         /* Get the list of names. */
188         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
189                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
190                 return;
191         }
192
193         /* Print the list of names. */
194         if (list != NULL) {
195                 for (file = list; *file != NULL; ++file) {
196                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
197                         ret = unlink(*file);
198                         if (ret != 0) {
199                                 snprintf(errmsg, sizeof(errmsg),
200                                          " ** ERROR **\n \n \n "
201                                          "Citadel was unable to delete the "
202                                          "database log file '%s' because of the "
203                                          "following error:\n \n %s\n \n"
204                                          " This log file is no longer in use "
205                                          "and may be safely deleted.\n",
206                                          *file, strerror(errno));
207                                 aide_message(errmsg, "Database Warning Message");
208                         }
209                 }
210                 free(list);
211         }
212 }
213
214 /*
215  * Manually initiate log file cull.
216  */
217 void cmd_cull(char *argbuf) {
218         if (CtdlAccessCheck(ac_internal)) return;
219         cdb_cull_logs();
220         cprintf("%d Database log file cull completed.\n", CIT_OK);
221 }
222
223
224 /*
225  * Request a checkpoint of the database.
226  */
227 void cdb_checkpoint(void)
228 {
229         int ret;
230 //      static time_t last_run = 0L;
231
232         /* Only do a checkpoint once per minute. */
233 /*
234  * Don't need this any more, since the thread that calls us sleeps for 60 seconds between calls
235  
236         if ((time(NULL) - last_run) < 60L) {
237                 return;
238         }
239         last_run = time(NULL);
240 */
241
242         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
243         ret = dbenv->txn_checkpoint(dbenv,
244                                     MAX_CHECKPOINT_KBYTES,
245                                     MAX_CHECKPOINT_MINUTES, 0);
246
247         if (ret != 0) {
248                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
249                         db_strerror(ret));
250                 abort();
251         }
252
253         /* After a successful checkpoint, we can cull the unused logs */
254         if (config.c_auto_cull) {
255                 cdb_cull_logs();
256         }
257 }
258
259
260
261 /*
262  * Open the various databases we'll be using.  Any database which
263  * does not exist should be created.  Note that we don't need a
264  * critical section here, because there aren't any active threads
265  * manipulating the database yet.
266  */
267 void open_databases(void)
268 {
269         int ret;
270         int i;
271         char dbfilename[SIZ];
272         u_int32_t flags = 0;
273         int dbversion_major, dbversion_minor, dbversion_patch;
274         int current_dbversion = 0;
275
276         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
277         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
278         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
279                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
280
281         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
282
283         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
284         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
285
286         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
287            && (CitControl.MMdbversion > current_dbversion) ) {
288                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
289                                         "of Berkeley DB that is older than that which last created or\n"
290                                         "updated the database.  Because this would probably cause data\n"
291                                         "corruption or loss, the server is aborting execution now.\n");
292                 exit(CTDLEXIT_DB);
293         }
294
295         CitControl.MMdbversion = current_dbversion;
296         put_control();
297
298 #ifdef HAVE_ZLIB
299         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
300 #endif
301
302         /*
303          * Silently try to create the database subdirectory.  If it's
304          * already there, no problem.
305          */
306         mkdir(ctdl_data_dir, 0700);
307         chmod(ctdl_data_dir, 0700);
308         chown(ctdl_data_dir, CTDLUID, (-1));
309
310         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
311         db_env_set_func_yield(sched_yield);
312         ret = db_env_create(&dbenv, 0);
313         if (ret) {
314                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
315                         db_strerror(ret));
316                 exit(CTDLEXIT_DB);
317         }
318         dbenv->set_errpfx(dbenv, "citserver");
319         dbenv->set_paniccall(dbenv, dbpanic);
320         dbenv->set_errcall(dbenv, cdb_verbose_err);
321         dbenv->set_errpfx(dbenv, "ctdl");
322 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
323         dbenv->set_msgcall(dbenv, cdb_verbose_log);
324 #endif
325         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
326         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
327
328         /*
329          * We want to specify the shared memory buffer pool cachesize,
330          * but everything else is the default.
331          */
332         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
333         if (ret) {
334                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
335                         db_strerror(ret));
336                 dbenv->close(dbenv, 0);
337                 exit(CTDLEXIT_DB);
338         }
339
340         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
341                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
342                         db_strerror(ret));
343                 dbenv->close(dbenv, 0);
344                 exit(CTDLEXIT_DB);
345         }
346
347         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
348         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
349         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
350         if (ret == DB_RUNRECOVERY) {
351                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
352                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
353                 flags |= DB_RECOVER;
354                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
355         }
356         if (ret == DB_RUNRECOVERY) {
357                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
358                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
359                 flags &= ~DB_RECOVER;
360                 flags |= DB_RECOVER_FATAL;
361                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
362         }
363         if (ret) {
364                 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open: %s\n", db_strerror(ret));
365                 dbenv->close(dbenv, 0);
366                 exit(CTDLEXIT_DB);
367         }
368
369         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
370
371         for (i = 0; i < MAXCDB; ++i) {
372
373                 /* Create a database handle */
374                 ret = db_create(&dbp[i], dbenv, 0);
375                 if (ret) {
376                         CtdlLogPrintf(CTDL_DEBUG, "db_create: %s\n",
377                                 db_strerror(ret));
378                         exit(CTDLEXIT_DB);
379                 }
380
381
382                 /* Arbitrary names for our tables -- we reference them by
383                  * number, so we don't have string names for them.
384                  */
385                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
386
387                 ret = dbp[i]->open(dbp[i],
388                                    NULL,
389                                    dbfilename,
390                                    NULL,
391                                    DB_BTREE,
392                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
393                                    0600);
394                 if (ret) {
395                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i,
396                                 db_strerror(ret));
397                         exit(CTDLEXIT_DB);
398                 }
399         }
400
401 }
402
403
404 /* Make sure we own all the files, because in a few milliseconds
405  * we're going to drop root privs.
406  */
407 void cdb_chmod_data(void) {
408         DIR *dp;
409         struct dirent *d;
410         char filename[PATH_MAX];
411
412         dp = opendir(ctdl_data_dir);
413         if (dp != NULL) {
414                 while (d = readdir(dp), d != NULL) {
415                         if (d->d_name[0] != '.') {
416                                 snprintf(filename, sizeof filename,
417                                          "%s/%s", ctdl_data_dir, d->d_name);
418                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
419                                         filename, chmod(filename, 0600)
420                                 );
421                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
422                                         filename, chown(filename, CTDLUID, (-1))
423                                 );
424                         }
425                 }
426                 closedir(dp);
427         }
428
429         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
430
431         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
432 }
433
434
435 /*
436  * Close all of the db database files we've opened.  This can be done
437  * in a loop, since it's just a bunch of closes.
438  */
439 void close_databases(void)
440 {
441         int a;
442         int ret;
443
444         ctdl_thread_internal_free_tsd();
445         
446         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
447                 CtdlLogPrintf(CTDL_EMERG,
448                         "txn_checkpoint: %s\n", db_strerror(ret));
449         }
450
451         /* print some statistics... */
452 #ifdef DB_STAT_ALL
453         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
454 #endif
455
456         /* close the tables */
457         for (a = 0; a < MAXCDB; ++a) {
458                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
459                 ret = dbp[a]->close(dbp[a], 0);
460                 if (ret) {
461                         CtdlLogPrintf(CTDL_EMERG,
462                                 "db_close: %s\n", db_strerror(ret));
463                 }
464
465         }
466
467         /* Close the handle. */
468         ret = dbenv->close(dbenv, 0);
469         if (ret) {
470                 CtdlLogPrintf(CTDL_EMERG,
471                         "DBENV->close: %s\n", db_strerror(ret));
472         }
473 }
474
475
476 /*
477  * Compression functions only used if we have zlib
478  */
479 #ifdef HAVE_ZLIB
480
481 void cdb_decompress_if_necessary(struct cdbdata *cdb)
482 {
483         static int magic = COMPRESS_MAGIC;
484         struct CtdlCompressHeader zheader;
485         char *uncompressed_data;
486         char *compressed_data;
487         uLongf destLen, sourceLen;
488
489         if (cdb == NULL)
490                 return;
491         if (cdb->ptr == NULL)
492                 return;
493         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
494                 return;
495
496         /* At this point we know we're looking at a compressed item. */
497         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
498
499         compressed_data = cdb->ptr;
500         compressed_data += sizeof(struct CtdlCompressHeader);
501
502         sourceLen = (uLongf) zheader.compressed_len;
503         destLen = (uLongf) zheader.uncompressed_len;
504         uncompressed_data = malloc(zheader.uncompressed_len);
505
506         if (uncompress((Bytef *) uncompressed_data,
507                        (uLongf *) & destLen,
508                        (const Bytef *) compressed_data,
509                        (uLong) sourceLen) != Z_OK) {
510                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
511                 abort();
512         }
513
514         free(cdb->ptr);
515         cdb->len = (size_t) destLen;
516         cdb->ptr = uncompressed_data;
517 }
518
519 #endif                          /* HAVE_ZLIB */
520
521
522 /*
523  * Store a piece of data.  Returns 0 if the operation was successful.  If a
524  * key already exists it should be overwritten.
525  */
526 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
527 {
528
529         DBT dkey, ddata;
530         DB_TXN *tid;
531         int ret = 0;
532
533 #ifdef HAVE_ZLIB
534         struct CtdlCompressHeader zheader;
535         char *compressed_data = NULL;
536         int compressing = 0;
537         size_t buffer_len = 0;
538         uLongf destLen = 0;
539 #endif
540
541         memset(&dkey, 0, sizeof(DBT));
542         memset(&ddata, 0, sizeof(DBT));
543         dkey.size = ckeylen;
544         dkey.data = ckey;
545         ddata.size = cdatalen;
546         ddata.data = cdata;
547
548 #ifdef HAVE_ZLIB
549         /* Only compress Visit records.  Everything else is uncompressed. */
550         if (cdb == CDB_VISIT) {
551                 compressing = 1;
552                 zheader.magic = COMPRESS_MAGIC;
553                 zheader.uncompressed_len = cdatalen;
554                 buffer_len = ((cdatalen * 101) / 100) + 100
555                     + sizeof(struct CtdlCompressHeader);
556                 destLen = (uLongf) buffer_len;
557                 compressed_data = malloc(buffer_len);
558                 if (compress2((Bytef *) (compressed_data +
559                                          sizeof(struct
560                                                 CtdlCompressHeader)),
561                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
562                               1) != Z_OK) {
563                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
564                         abort();
565                 }
566                 zheader.compressed_len = (size_t) destLen;
567                 memcpy(compressed_data, &zheader,
568                        sizeof(struct CtdlCompressHeader));
569                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
570                                        zheader.compressed_len);
571                 ddata.data = compressed_data;
572         }
573 #endif
574
575         if (MYTID != NULL) {
576                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
577                                     MYTID,      /* transaction ID */
578                                     &dkey,      /* key */
579                                     &ddata,     /* data */
580                                     0); /* flags */
581                 if (ret) {
582                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
583                                 db_strerror(ret));
584                         abort();
585                 }
586 #ifdef HAVE_ZLIB
587                 if (compressing)
588                         free(compressed_data);
589 #endif
590                 return ret;
591
592         } else {
593                 bailIfCursor(MYCURSORS,
594                              "attempt to write during r/o cursor");
595
596               retry:
597                 txbegin(&tid);
598
599                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
600                                          tid,   /* transaction ID */
601                                          &dkey, /* key */
602                                          &ddata,        /* data */
603                                          0))) { /* flags */
604                         if (ret == DB_LOCK_DEADLOCK) {
605                                 txabort(tid);
606                                 goto retry;
607                         } else {
608                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
609                                         cdb, db_strerror(ret));
610                                 abort();
611                         }
612                 } else {
613                         txcommit(tid);
614 #ifdef HAVE_ZLIB
615                         if (compressing)
616                                 free(compressed_data);
617 #endif
618                         return ret;
619                 }
620         }
621 }
622
623
624 /*
625  * Delete a piece of data.  Returns 0 if the operation was successful.
626  */
627 int cdb_delete(int cdb, void *key, int keylen)
628 {
629
630         DBT dkey;
631         DB_TXN *tid;
632         int ret;
633
634         memset(&dkey, 0, sizeof dkey);
635         dkey.size = keylen;
636         dkey.data = key;
637
638         if (MYTID != NULL) {
639                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
640                 if (ret) {
641                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
642                                 db_strerror(ret));
643                         if (ret != DB_NOTFOUND)
644                                 abort();
645                 }
646         } else {
647                 bailIfCursor(MYCURSORS,
648                              "attempt to delete during r/o cursor");
649
650               retry:
651                 txbegin(&tid);
652
653                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
654                     && ret != DB_NOTFOUND) {
655                         if (ret == DB_LOCK_DEADLOCK) {
656                                 txabort(tid);
657                                 goto retry;
658                         } else {
659                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
660                                         cdb, db_strerror(ret));
661                                 abort();
662                         }
663                 } else {
664                         txcommit(tid);
665                 }
666         }
667         return ret;
668 }
669
670 static DBC *localcursor(int cdb)
671 {
672         int ret;
673         DBC *curs;
674
675         if (MYCURSORS[cdb] == NULL)
676                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
677         else
678                 ret =
679                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
680                                           DB_POSITION);
681
682         if (ret) {
683                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
684                 abort();
685         }
686
687         return curs;
688 }
689
690
691 /*
692  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
693  * a struct cdbdata which it is the caller's responsibility to free later on
694  * using the cdb_free() routine.
695  */
696 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
697 {
698
699         struct cdbdata *tempcdb;
700         DBT dkey, dret;
701         int ret;
702
703         memset(&dkey, 0, sizeof(DBT));
704         dkey.size = keylen;
705         dkey.data = key;
706
707         if (MYTID != NULL) {
708                 memset(&dret, 0, sizeof(DBT));
709                 dret.flags = DB_DBT_MALLOC;
710                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
711         } else {
712                 DBC *curs;
713
714                 do {
715                         memset(&dret, 0, sizeof(DBT));
716                         dret.flags = DB_DBT_MALLOC;
717
718                         curs = localcursor(cdb);
719
720                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
721                         cclose(curs);
722                 }
723                 while (ret == DB_LOCK_DEADLOCK);
724
725         }
726
727         if ((ret != 0) && (ret != DB_NOTFOUND)) {
728                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
729                         db_strerror(ret));
730                 abort();
731         }
732
733         if (ret != 0)
734                 return NULL;
735         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
736
737         if (tempcdb == NULL) {
738                 CtdlLogPrintf(CTDL_EMERG,
739                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
740                 abort();
741         }
742
743         tempcdb->len = dret.size;
744         tempcdb->ptr = dret.data;
745 #ifdef HAVE_ZLIB
746         cdb_decompress_if_necessary(tempcdb);
747 #endif
748         return (tempcdb);
749 }
750
751
752 /*
753  * Free a cdbdata item.
754  *
755  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
756  * other code to assume ownership of that memory simply by storing the
757  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
758  * avoid freeing it.
759  */
760 void cdb_free(struct cdbdata *cdb)
761 {
762         if (cdb->ptr) {
763                 free(cdb->ptr);
764         }
765         free(cdb);
766 }
767
768 void cdb_close_cursor(int cdb)
769 {
770         if (MYCURSORS[cdb] != NULL)
771                 cclose(MYCURSORS[cdb]);
772
773         MYCURSORS[cdb] = NULL;
774 }
775
776 /* 
777  * Prepare for a sequential search of an entire database.
778  * (There is guaranteed to be no more than one traversal in
779  * progress per thread at any given time.)
780  */
781 void cdb_rewind(int cdb)
782 {
783         int ret = 0;
784
785         if (MYCURSORS[cdb] != NULL) {
786                 CtdlLogPrintf(CTDL_EMERG,
787                         "cdb_rewind: must close cursor on database %d before reopening.\n",
788                         cdb);
789                 abort();
790                 /* cclose(MYCURSORS[cdb]); */
791         }
792
793         /*
794          * Now initialize the cursor
795          */
796         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
797         if (ret) {
798                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
799                         db_strerror(ret));
800                 abort();
801         }
802 }
803
804
805 /*
806  * Fetch the next item in a sequential search.  Returns a pointer to a 
807  * cdbdata structure, or NULL if we've hit the end.
808  */
809 struct cdbdata *cdb_next_item(int cdb)
810 {
811         DBT key, data;
812         struct cdbdata *cdbret;
813         int ret = 0;
814
815         /* Initialize the key/data pair so the flags aren't set. */
816         memset(&key, 0, sizeof(key));
817         memset(&data, 0, sizeof(data));
818         data.flags = DB_DBT_MALLOC;
819
820         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
821
822         if (ret) {
823                 if (ret != DB_NOTFOUND) {
824                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
825                                 cdb, db_strerror(ret));
826                         abort();
827                 }
828                 cclose(MYCURSORS[cdb]);
829                 MYCURSORS[cdb] = NULL;
830                 return NULL;    /* presumably, end of file */
831         }
832
833         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
834         cdbret->len = data.size;
835         cdbret->ptr = data.data;
836 #ifdef HAVE_ZLIB
837         cdb_decompress_if_necessary(cdbret);
838 #endif
839
840         return (cdbret);
841 }
842
843
844
845 /*
846  * Transaction-based stuff.  I'm writing this as I bake cookies...
847  */
848
849 void cdb_begin_transaction(void)
850 {
851
852         bailIfCursor(MYCURSORS,
853                      "can't begin transaction during r/o cursor");
854
855         if (MYTID != NULL) {
856                 CtdlLogPrintf(CTDL_EMERG,
857                         "cdb_begin_transaction: ERROR: nested transaction\n");
858                 abort();
859         }
860
861         txbegin(&MYTID);
862 }
863
864 void cdb_end_transaction(void)
865 {
866         int i;
867
868         for (i = 0; i < MAXCDB; i++)
869                 if (MYCURSORS[i] != NULL) {
870                         CtdlLogPrintf(CTDL_WARNING,
871                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
872                                 i);
873                         cclose(MYCURSORS[i]);
874                         MYCURSORS[i] = NULL;
875                 }
876
877         if (MYTID == NULL) {
878                 CtdlLogPrintf(CTDL_EMERG,
879                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
880                 abort();
881         } else
882                 txcommit(MYTID);
883
884         MYTID = NULL;
885 }
886
887 /*
888  * Truncate (delete every record)
889  */
890 void cdb_trunc(int cdb)
891 {
892         /* DB_TXN *tid; */
893         int ret;
894         u_int32_t count;
895
896         if (MYTID != NULL) {
897                 CtdlLogPrintf(CTDL_EMERG,
898                         "cdb_trunc must not be called in a transaction.\n");
899                 abort();
900         } else {
901                 bailIfCursor(MYCURSORS,
902                              "attempt to write during r/o cursor");
903
904               retry:
905                 /* txbegin(&tid); */
906
907                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
908                                               NULL,     /* transaction ID */
909                                               &count,   /* #rows deleted */
910                                               0))) {    /* flags */
911                         if (ret == DB_LOCK_DEADLOCK) {
912                                 /* txabort(tid); */
913                                 goto retry;
914                         } else {
915                                 CtdlLogPrintf(CTDL_EMERG,
916                                         "cdb_truncate(%d): %s\n", cdb,
917                                         db_strerror(ret));
918                                 abort();
919                         }
920                 } else {
921                         /* txcommit(tid); */
922                 }
923         }
924 }