Added a mini http fetcher into webcit
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55
56 #include "ctdl_module.h"
57
58
59 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
60 static DB_ENV *dbenv;           /* The DB environment (global) */
61
62
63 #ifdef HAVE_ZLIB
64 #include <zlib.h>
65 #endif
66
67
68 /* Verbose logging callback */
69 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
70 {
71         CtdlLogPrintf(CTDL_DEBUG, "BDB: %s\n", msg);
72 }
73
74
75 /* Verbose logging callback */
76 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
77 {
78         CtdlLogPrintf(CTDL_ALERT, "BDB: %s\n", msg);
79 }
80
81
82 /* just a little helper function */
83 static void txabort(DB_TXN * tid)
84 {
85         int ret;
86
87         ret = tid->abort(tid);
88
89         if (ret) {
90                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
91                         db_strerror(ret));
92                 abort();
93         }
94 }
95
96 /* this one is even more helpful than the last. */
97 static void txcommit(DB_TXN * tid)
98 {
99         int ret;
100
101         ret = tid->commit(tid, 0);
102
103         if (ret) {
104                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
105                         db_strerror(ret));
106                 abort();
107         }
108 }
109
110 /* are you sensing a pattern yet? */
111 static void txbegin(DB_TXN ** tid)
112 {
113         int ret;
114
115         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
116
117         if (ret) {
118                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
119                         db_strerror(ret));
120                 abort();
121         }
122 }
123
124 static void dbpanic(DB_ENV * env, int errval)
125 {
126         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
127 }
128
129 static void cclose(DBC * cursor)
130 {
131         int ret;
132
133         if ((ret = cursor->c_close(cursor))) {
134                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
135                         db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void bailIfCursor(DBC ** cursors, const char *msg)
141 {
142         int i;
143
144         for (i = 0; i < MAXCDB; i++)
145                 if (cursors[i] != NULL) {
146                         CtdlLogPrintf(CTDL_EMERG,
147                                 "cdb_*: cursor still in progress on cdb %d: %s\n",
148                                 i, msg);
149                         abort();
150                 }
151 }
152
153 void check_handles(void *arg)
154 {
155         if (arg != NULL) {
156                 ThreadTSD *tsd = (ThreadTSD *) arg;
157
158                 bailIfCursor(tsd->cursors, "in check_handles");
159
160                 if (tsd->tid != NULL) {
161                         CtdlLogPrintf(CTDL_EMERG,
162                                 "cdb_*: transaction still in progress!");
163                         abort();
164                 }
165         }
166 }
167
168 void cdb_check_handles(void)
169 {
170         check_handles(pthread_getspecific(ThreadKey));
171 }
172
173
174 /*
175  * Cull the database logs
176  */
177 static void cdb_cull_logs(void)
178 {
179         u_int32_t flags;
180         int ret;
181         char **file, **list;
182         char errmsg[SIZ];
183
184         flags = DB_ARCH_ABS;
185
186         /* Get the list of names. */
187         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
188                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
189                 return;
190         }
191
192         /* Print the list of names. */
193         if (list != NULL) {
194                 for (file = list; *file != NULL; ++file) {
195                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
196                         ret = unlink(*file);
197                         if (ret != 0) {
198                                 snprintf(errmsg, sizeof(errmsg),
199                                          " ** ERROR **\n \n \n "
200                                          "Citadel was unable to delete the "
201                                          "database log file '%s' because of the "
202                                          "following error:\n \n %s\n \n"
203                                          " This log file is no longer in use "
204                                          "and may be safely deleted.\n",
205                                          *file, strerror(errno));
206                                 aide_message(errmsg, "Database Warning Message");
207                         }
208                 }
209                 free(list);
210         }
211 }
212
213 /*
214  * Manually initiate log file cull.
215  */
216 void cmd_cull(char *argbuf) {
217         if (CtdlAccessCheck(ac_internal)) return;
218         cdb_cull_logs();
219         cprintf("%d Database log file cull completed.\n", CIT_OK);
220 }
221
222
223 /*
224  * Request a checkpoint of the database.
225  */
226 void cdb_checkpoint(void)
227 {
228         int ret;
229 //      static time_t last_run = 0L;
230
231         /* Only do a checkpoint once per minute. */
232 /*
233  * Don't need this any more, since the thread that calls us sleeps for 60 seconds between calls
234  
235         if ((time(NULL) - last_run) < 60L) {
236                 return;
237         }
238         last_run = time(NULL);
239 */
240
241         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
242         ret = dbenv->txn_checkpoint(dbenv,
243                                     MAX_CHECKPOINT_KBYTES,
244                                     MAX_CHECKPOINT_MINUTES, 0);
245
246         if (ret != 0) {
247                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
248                         db_strerror(ret));
249                 abort();
250         }
251
252         /* After a successful checkpoint, we can cull the unused logs */
253         if (config.c_auto_cull) {
254                 cdb_cull_logs();
255         }
256 }
257
258
259
260 /*
261  * Open the various databases we'll be using.  Any database which
262  * does not exist should be created.  Note that we don't need a
263  * critical section here, because there aren't any active threads
264  * manipulating the database yet.
265  */
266 void open_databases(void)
267 {
268         int ret;
269         int i;
270         char dbfilename[SIZ];
271         u_int32_t flags = 0;
272
273         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
274         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
275         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
276                 db_version(NULL, NULL, NULL));
277 #ifdef HAVE_ZLIB
278         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
279 #endif
280
281         /*
282          * Silently try to create the database subdirectory.  If it's
283          * already there, no problem.
284          */
285         mkdir(ctdl_data_dir, 0700);
286         chmod(ctdl_data_dir, 0700);
287         chown(ctdl_data_dir, CTDLUID, (-1));
288
289         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
290         db_env_set_func_yield(sched_yield);
291         ret = db_env_create(&dbenv, 0);
292         if (ret) {
293                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n",
294                         db_strerror(ret));
295                 exit(CTDLEXIT_DB);
296         }
297         dbenv->set_errpfx(dbenv, "citserver");
298         dbenv->set_paniccall(dbenv, dbpanic);
299         dbenv->set_errcall(dbenv, cdb_verbose_err);
300         dbenv->set_errpfx(dbenv, "ctdl");
301 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
302         dbenv->set_msgcall(dbenv, cdb_verbose_log);
303 #endif
304         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
305         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
306
307         /*
308          * We want to specify the shared memory buffer pool cachesize,
309          * but everything else is the default.
310          */
311         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
312         if (ret) {
313                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n",
314                         db_strerror(ret));
315                 dbenv->close(dbenv, 0);
316                 exit(CTDLEXIT_DB);
317         }
318
319         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
320                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n",
321                         db_strerror(ret));
322                 dbenv->close(dbenv, 0);
323                 exit(CTDLEXIT_DB);
324         }
325
326         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
327         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
328         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
329         if (ret == DB_RUNRECOVERY) {
330                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
331                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
332                 flags |= DB_RECOVER;
333                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
334         }
335         if (ret == DB_RUNRECOVERY) {
336                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
337                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
338                 flags &= ~DB_RECOVER;
339                 flags |= DB_RECOVER_FATAL;
340                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
341         }
342         if (ret) {
343                 CtdlLogPrintf(CTDL_DEBUG, "dbenv->open: %s\n", db_strerror(ret));
344                 dbenv->close(dbenv, 0);
345                 exit(CTDLEXIT_DB);
346         }
347
348         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
349
350         for (i = 0; i < MAXCDB; ++i) {
351
352                 /* Create a database handle */
353                 ret = db_create(&dbp[i], dbenv, 0);
354                 if (ret) {
355                         CtdlLogPrintf(CTDL_DEBUG, "db_create: %s\n",
356                                 db_strerror(ret));
357                         exit(CTDLEXIT_DB);
358                 }
359
360
361                 /* Arbitrary names for our tables -- we reference them by
362                  * number, so we don't have string names for them.
363                  */
364                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
365
366                 ret = dbp[i]->open(dbp[i],
367                                    NULL,
368                                    dbfilename,
369                                    NULL,
370                                    DB_BTREE,
371                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
372                                    0600);
373                 if (ret) {
374                         CtdlLogPrintf(CTDL_EMERG, "db_open[%d]: %s\n", i,
375                                 db_strerror(ret));
376                         exit(CTDLEXIT_DB);
377                 }
378         }
379
380 }
381
382
383 /* Make sure we own all the files, because in a few milliseconds
384  * we're going to drop root privs.
385  */
386 void cdb_chmod_data(void) {
387         DIR *dp;
388         struct dirent *d;
389         char filename[PATH_MAX];
390
391         dp = opendir(ctdl_data_dir);
392         if (dp != NULL) {
393                 while (d = readdir(dp), d != NULL) {
394                         if (d->d_name[0] != '.') {
395                                 snprintf(filename, sizeof filename,
396                                          "%s/%s", ctdl_data_dir, d->d_name);
397                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
398                                         filename, chmod(filename, 0600)
399                                 );
400                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
401                                         filename, chown(filename, CTDLUID, (-1))
402                                 );
403                         }
404                 }
405                 closedir(dp);
406         }
407
408         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
409
410         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
411 }
412
413
414 /*
415  * Close all of the db database files we've opened.  This can be done
416  * in a loop, since it's just a bunch of closes.
417  */
418 void close_databases(void)
419 {
420         int a;
421         int ret;
422
423         ctdl_thread_internal_free_tsd();
424         
425         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
426                 CtdlLogPrintf(CTDL_EMERG,
427                         "txn_checkpoint: %s\n", db_strerror(ret));
428         }
429
430         /* print some statistics... */
431 #ifdef DB_STAT_ALL
432         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
433 #endif
434
435         /* close the tables */
436         for (a = 0; a < MAXCDB; ++a) {
437                 CtdlLogPrintf(CTDL_INFO, "Closing database %d\n", a);
438                 ret = dbp[a]->close(dbp[a], 0);
439                 if (ret) {
440                         CtdlLogPrintf(CTDL_EMERG,
441                                 "db_close: %s\n", db_strerror(ret));
442                 }
443
444         }
445
446         /* Close the handle. */
447         ret = dbenv->close(dbenv, 0);
448         if (ret) {
449                 CtdlLogPrintf(CTDL_EMERG,
450                         "DBENV->close: %s\n", db_strerror(ret));
451         }
452 }
453
454
455 /*
456  * Compression functions only used if we have zlib
457  */
458 #ifdef HAVE_ZLIB
459
460 void cdb_decompress_if_necessary(struct cdbdata *cdb)
461 {
462         static int magic = COMPRESS_MAGIC;
463         struct CtdlCompressHeader zheader;
464         char *uncompressed_data;
465         char *compressed_data;
466         uLongf destLen, sourceLen;
467
468         if (cdb == NULL)
469                 return;
470         if (cdb->ptr == NULL)
471                 return;
472         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
473                 return;
474
475         /* At this point we know we're looking at a compressed item. */
476         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
477
478         compressed_data = cdb->ptr;
479         compressed_data += sizeof(struct CtdlCompressHeader);
480
481         sourceLen = (uLongf) zheader.compressed_len;
482         destLen = (uLongf) zheader.uncompressed_len;
483         uncompressed_data = malloc(zheader.uncompressed_len);
484
485         if (uncompress((Bytef *) uncompressed_data,
486                        (uLongf *) & destLen,
487                        (const Bytef *) compressed_data,
488                        (uLong) sourceLen) != Z_OK) {
489                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
490                 abort();
491         }
492
493         free(cdb->ptr);
494         cdb->len = (size_t) destLen;
495         cdb->ptr = uncompressed_data;
496 }
497
498 #endif                          /* HAVE_ZLIB */
499
500
501 /*
502  * Store a piece of data.  Returns 0 if the operation was successful.  If a
503  * key already exists it should be overwritten.
504  */
505 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
506 {
507
508         DBT dkey, ddata;
509         DB_TXN *tid;
510         int ret = 0;
511
512 #ifdef HAVE_ZLIB
513         struct CtdlCompressHeader zheader;
514         char *compressed_data = NULL;
515         int compressing = 0;
516         size_t buffer_len = 0;
517         uLongf destLen = 0;
518 #endif
519
520         memset(&dkey, 0, sizeof(DBT));
521         memset(&ddata, 0, sizeof(DBT));
522         dkey.size = ckeylen;
523         dkey.data = ckey;
524         ddata.size = cdatalen;
525         ddata.data = cdata;
526
527 #ifdef HAVE_ZLIB
528         /* Only compress Visit records.  Everything else is uncompressed. */
529         if (cdb == CDB_VISIT) {
530                 compressing = 1;
531                 zheader.magic = COMPRESS_MAGIC;
532                 zheader.uncompressed_len = cdatalen;
533                 buffer_len = ((cdatalen * 101) / 100) + 100
534                     + sizeof(struct CtdlCompressHeader);
535                 destLen = (uLongf) buffer_len;
536                 compressed_data = malloc(buffer_len);
537                 if (compress2((Bytef *) (compressed_data +
538                                          sizeof(struct
539                                                 CtdlCompressHeader)),
540                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
541                               1) != Z_OK) {
542                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
543                         abort();
544                 }
545                 zheader.compressed_len = (size_t) destLen;
546                 memcpy(compressed_data, &zheader,
547                        sizeof(struct CtdlCompressHeader));
548                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
549                                        zheader.compressed_len);
550                 ddata.data = compressed_data;
551         }
552 #endif
553
554         if (MYTID != NULL) {
555                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
556                                     MYTID,      /* transaction ID */
557                                     &dkey,      /* key */
558                                     &ddata,     /* data */
559                                     0); /* flags */
560                 if (ret) {
561                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
562                                 db_strerror(ret));
563                         abort();
564                 }
565 #ifdef HAVE_ZLIB
566                 if (compressing)
567                         free(compressed_data);
568 #endif
569                 return ret;
570
571         } else {
572                 bailIfCursor(MYCURSORS,
573                              "attempt to write during r/o cursor");
574
575               retry:
576                 txbegin(&tid);
577
578                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
579                                          tid,   /* transaction ID */
580                                          &dkey, /* key */
581                                          &ddata,        /* data */
582                                          0))) { /* flags */
583                         if (ret == DB_LOCK_DEADLOCK) {
584                                 txabort(tid);
585                                 goto retry;
586                         } else {
587                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
588                                         cdb, db_strerror(ret));
589                                 abort();
590                         }
591                 } else {
592                         txcommit(tid);
593 #ifdef HAVE_ZLIB
594                         if (compressing)
595                                 free(compressed_data);
596 #endif
597                         return ret;
598                 }
599         }
600 }
601
602
603 /*
604  * Delete a piece of data.  Returns 0 if the operation was successful.
605  */
606 int cdb_delete(int cdb, void *key, int keylen)
607 {
608
609         DBT dkey;
610         DB_TXN *tid;
611         int ret;
612
613         memset(&dkey, 0, sizeof dkey);
614         dkey.size = keylen;
615         dkey.data = key;
616
617         if (MYTID != NULL) {
618                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
619                 if (ret) {
620                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
621                                 db_strerror(ret));
622                         if (ret != DB_NOTFOUND)
623                                 abort();
624                 }
625         } else {
626                 bailIfCursor(MYCURSORS,
627                              "attempt to delete during r/o cursor");
628
629               retry:
630                 txbegin(&tid);
631
632                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
633                     && ret != DB_NOTFOUND) {
634                         if (ret == DB_LOCK_DEADLOCK) {
635                                 txabort(tid);
636                                 goto retry;
637                         } else {
638                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
639                                         cdb, db_strerror(ret));
640                                 abort();
641                         }
642                 } else {
643                         txcommit(tid);
644                 }
645         }
646         return ret;
647 }
648
649 static DBC *localcursor(int cdb)
650 {
651         int ret;
652         DBC *curs;
653
654         if (MYCURSORS[cdb] == NULL)
655                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
656         else
657                 ret =
658                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
659                                           DB_POSITION);
660
661         if (ret) {
662                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
663                 abort();
664         }
665
666         return curs;
667 }
668
669
670 /*
671  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
672  * a struct cdbdata which it is the caller's responsibility to free later on
673  * using the cdb_free() routine.
674  */
675 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
676 {
677
678         struct cdbdata *tempcdb;
679         DBT dkey, dret;
680         int ret;
681
682         memset(&dkey, 0, sizeof(DBT));
683         dkey.size = keylen;
684         dkey.data = key;
685
686         if (MYTID != NULL) {
687                 memset(&dret, 0, sizeof(DBT));
688                 dret.flags = DB_DBT_MALLOC;
689                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
690         } else {
691                 DBC *curs;
692
693                 do {
694                         memset(&dret, 0, sizeof(DBT));
695                         dret.flags = DB_DBT_MALLOC;
696
697                         curs = localcursor(cdb);
698
699                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
700                         cclose(curs);
701                 }
702                 while (ret == DB_LOCK_DEADLOCK);
703
704         }
705
706         if ((ret != 0) && (ret != DB_NOTFOUND)) {
707                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
708                         db_strerror(ret));
709                 abort();
710         }
711
712         if (ret != 0)
713                 return NULL;
714         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
715
716         if (tempcdb == NULL) {
717                 CtdlLogPrintf(CTDL_EMERG,
718                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
719                 abort();
720         }
721
722         tempcdb->len = dret.size;
723         tempcdb->ptr = dret.data;
724 #ifdef HAVE_ZLIB
725         cdb_decompress_if_necessary(tempcdb);
726 #endif
727         return (tempcdb);
728 }
729
730
731 /*
732  * Free a cdbdata item.
733  *
734  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
735  * other code to assume ownership of that memory simply by storing the
736  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
737  * avoid freeing it.
738  */
739 void cdb_free(struct cdbdata *cdb)
740 {
741         if (cdb->ptr) {
742                 free(cdb->ptr);
743         }
744         free(cdb);
745 }
746
747 void cdb_close_cursor(int cdb)
748 {
749         if (MYCURSORS[cdb] != NULL)
750                 cclose(MYCURSORS[cdb]);
751
752         MYCURSORS[cdb] = NULL;
753 }
754
755 /* 
756  * Prepare for a sequential search of an entire database.
757  * (There is guaranteed to be no more than one traversal in
758  * progress per thread at any given time.)
759  */
760 void cdb_rewind(int cdb)
761 {
762         int ret = 0;
763
764         if (MYCURSORS[cdb] != NULL) {
765                 CtdlLogPrintf(CTDL_EMERG,
766                         "cdb_rewind: must close cursor on database %d before reopening.\n",
767                         cdb);
768                 abort();
769                 /* cclose(MYCURSORS[cdb]); */
770         }
771
772         /*
773          * Now initialize the cursor
774          */
775         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
776         if (ret) {
777                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
778                         db_strerror(ret));
779                 abort();
780         }
781 }
782
783
784 /*
785  * Fetch the next item in a sequential search.  Returns a pointer to a 
786  * cdbdata structure, or NULL if we've hit the end.
787  */
788 struct cdbdata *cdb_next_item(int cdb)
789 {
790         DBT key, data;
791         struct cdbdata *cdbret;
792         int ret = 0;
793
794         /* Initialize the key/data pair so the flags aren't set. */
795         memset(&key, 0, sizeof(key));
796         memset(&data, 0, sizeof(data));
797         data.flags = DB_DBT_MALLOC;
798
799         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
800
801         if (ret) {
802                 if (ret != DB_NOTFOUND) {
803                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
804                                 cdb, db_strerror(ret));
805                         abort();
806                 }
807                 cclose(MYCURSORS[cdb]);
808                 MYCURSORS[cdb] = NULL;
809                 return NULL;    /* presumably, end of file */
810         }
811
812         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
813         cdbret->len = data.size;
814         cdbret->ptr = data.data;
815 #ifdef HAVE_ZLIB
816         cdb_decompress_if_necessary(cdbret);
817 #endif
818
819         return (cdbret);
820 }
821
822
823
824 /*
825  * Transaction-based stuff.  I'm writing this as I bake cookies...
826  */
827
828 void cdb_begin_transaction(void)
829 {
830
831         bailIfCursor(MYCURSORS,
832                      "can't begin transaction during r/o cursor");
833
834         if (MYTID != NULL) {
835                 CtdlLogPrintf(CTDL_EMERG,
836                         "cdb_begin_transaction: ERROR: nested transaction\n");
837                 abort();
838         }
839
840         txbegin(&MYTID);
841 }
842
843 void cdb_end_transaction(void)
844 {
845         int i;
846
847         for (i = 0; i < MAXCDB; i++)
848                 if (MYCURSORS[i] != NULL) {
849                         CtdlLogPrintf(CTDL_WARNING,
850                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
851                                 i);
852                         cclose(MYCURSORS[i]);
853                         MYCURSORS[i] = NULL;
854                 }
855
856         if (MYTID == NULL) {
857                 CtdlLogPrintf(CTDL_EMERG,
858                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
859                 abort();
860         } else
861                 txcommit(MYTID);
862
863         MYTID = NULL;
864 }
865
866 /*
867  * Truncate (delete every record)
868  */
869 void cdb_trunc(int cdb)
870 {
871         /* DB_TXN *tid; */
872         int ret;
873         u_int32_t count;
874
875         if (MYTID != NULL) {
876                 CtdlLogPrintf(CTDL_EMERG,
877                         "cdb_trunc must not be called in a transaction.\n");
878                 abort();
879         } else {
880                 bailIfCursor(MYCURSORS,
881                              "attempt to write during r/o cursor");
882
883               retry:
884                 /* txbegin(&tid); */
885
886                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
887                                               NULL,     /* transaction ID */
888                                               &count,   /* #rows deleted */
889                                               0))) {    /* flags */
890                         if (ret == DB_LOCK_DEADLOCK) {
891                                 /* txabort(tid); */
892                                 goto retry;
893                         } else {
894                                 CtdlLogPrintf(CTDL_EMERG,
895                                         "cdb_truncate(%d): %s\n", cdb,
896                                         db_strerror(ret));
897                                 abort();
898                         }
899                 } else {
900                         /* txcommit(tid); */
901                 }
902         }
903 }