]> code.citadel.org Git - citadel.git/blob - citadel/database_sleepycat.c
5f1f112ecebe16ca3c146c66b0ca3240ee4e17fa
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #ifdef DLL_EXPORT
22 #define IN_LIBCIT
23 #endif
24
25 #include "sysdep.h"
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <dirent.h>
35
36 #ifdef HAVE_DB_H
37 #include <db.h>
38 #elif defined(HAVE_DB4_DB_H)
39 #include <db4/db.h>
40 #else
41 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
42 #endif
43
44
45 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
46 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
47 #endif
48
49
50 #include <pthread.h>
51 #include "citadel.h"
52 #include "server.h"
53 #include "serv_extensions.h"
54 #include "citserver.h"
55 #include "database.h"
56 #include "msgbase.h"
57 #include "sysdep_decls.h"
58 #include "config.h"
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63 struct cdbtsd {                 /* Thread-specific DB stuff */
64         DB_TXN *tid;            /* Transaction handle */
65         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
66 };
67
68 #ifdef HAVE_ZLIB
69 #include <zlib.h>
70 #endif
71
72 static pthread_key_t tsdkey;
73
74 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
75 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
76
77 /* just a little helper function */
78 static void txabort(DB_TXN *tid) {
79         int ret;
80
81         ret = tid->abort(tid);
82
83         if (ret) {
84                 lprintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n", db_strerror(ret));
85                 abort();
86         }
87 }
88
89 /* this one is even more helpful than the last. */
90 static void txcommit(DB_TXN *tid) {
91         int ret;
92
93         ret = tid->commit(tid, 0);
94
95         if (ret) {
96                 lprintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n", db_strerror(ret));
97                 abort();
98         }
99 }
100
101 /* are you sensing a pattern yet? */
102 static void txbegin(DB_TXN **tid) {
103         int ret;
104
105         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
106
107         if (ret) {
108                 lprintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n", db_strerror(ret));
109                 abort();
110         }
111 }
112
113 static void dbpanic(DB_ENV* env, int errval)
114 {
115         lprintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
116 }
117
118 static void cclose(DBC *cursor) {
119         int ret;
120
121         if ((ret = cursor->c_close(cursor))) {
122                 lprintf(CTDL_EMERG, "cdb_*: c_close: %s\n", db_strerror(ret));
123                 abort();
124         }
125 }
126
127 static void bailIfCursor(DBC **cursors, const char *msg)
128 {
129   int i;
130
131   for (i = 0; i < MAXCDB; i++)
132     if (cursors[i] != NULL)
133       {
134         lprintf(CTDL_EMERG, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
135         abort();
136       }
137 }
138
139 static void check_handles(void *arg) {
140         if (arg != NULL) {
141                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
142
143                 bailIfCursor(tsd->cursors, "in check_handles");
144
145                 if (tsd->tid != NULL) {
146                         lprintf(CTDL_EMERG, "cdb_*: transaction still in progress!");
147                         abort();
148                 }
149         }
150 }
151
152 static void dest_tsd(void *arg) {
153         if (arg != NULL) {
154                 check_handles(arg);
155                 free(arg);
156         }
157 }
158
159 /*
160  * Ensure that we have a key for thread-specific data.  We don't
161  * put anything in here that Citadel cares about; this is just database
162  * related stuff like cursors and transactions.
163  *
164  * This should be called immediately after startup by any thread which wants
165  * to use database calls, except for whatever thread calls open_databases.
166  */
167 void cdb_allocate_tsd(void) {
168         struct cdbtsd *tsd;
169
170         if (pthread_getspecific(tsdkey) != NULL)
171                 return;
172
173         tsd = malloc(sizeof(struct cdbtsd));
174
175         tsd->tid = NULL;
176
177         memset(tsd->cursors, 0, sizeof tsd->cursors);
178         pthread_setspecific(tsdkey, tsd);
179 }
180
181 void cdb_free_tsd(void) {
182         dest_tsd(pthread_getspecific(tsdkey));
183         pthread_setspecific(tsdkey, NULL);
184 }
185
186 void cdb_check_handles(void) {
187         check_handles(pthread_getspecific(tsdkey));
188 }
189
190
191 /*
192  * Reclaim unused space in the databases.  We need to do each one of
193  * these discretely, rather than in a loop.
194  *
195  * This is a stub function in the Sleepycat DB backend, because there is no
196  * such API call available.
197  */
198 void defrag_databases(void)
199 {
200         /* do nothing */
201 }
202
203
204 /*
205  * Cull the database logs
206  */
207 static void cdb_cull_logs(void) {
208         u_int32_t flags;
209         int ret;
210         char **file, **list;
211         char errmsg[SIZ];
212
213         flags = DB_ARCH_ABS;
214
215         /* Get the list of names. */
216         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
217                 lprintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
218                 return;
219         }
220
221         /* Print the list of names. */
222         if (list != NULL) {
223                 for (file = list; *file != NULL; ++file) {
224                         lprintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
225                         ret = unlink(*file);
226                         if (ret != 0) {
227                                 snprintf(errmsg, sizeof(errmsg),
228                                         " ** ERROR **\n \n \n "
229                                         "Citadel was unable to delete the "
230                                         "database log file '%s' because of the "
231                                         "following error:\n \n %s\n \n"
232                                         " This log file is no longer in use "
233                                         "and may be safely deleted.\n",
234                                         *file,
235                                         strerror(errno));
236                                 aide_message(errmsg);
237                         }
238                 }
239                 free(list);
240         }
241
242         lprintf(CTDL_INFO, "Database log file cull ended.\n");
243 }
244
245
246 /*
247  * Request a checkpoint of the database.
248  */
249 static void cdb_checkpoint(void) {
250         int ret;
251         /* static time_t last_cull = 0L; */
252
253         ret = dbenv->txn_checkpoint(dbenv,
254                                 MAX_CHECKPOINT_KBYTES,
255                                 MAX_CHECKPOINT_MINUTES,
256                                 0);
257
258         if (ret != 0) {
259                 lprintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
260                         db_strerror(ret));
261                 abort();
262         }
263
264         /* After a successful checkpoint, we can cull the unused logs */
265         cdb_cull_logs();
266
267         /* Cull the logs if we haven't done so for 24 hours
268         if ((time(NULL) - last_cull) > 86400L) {
269                 last_cull = time(NULL);
270                 cdb_cull_logs();
271         } */
272
273 }
274
275 /*
276  * Open the various databases we'll be using.  Any database which
277  * does not exist should be created.  Note that we don't need a
278  * critical section here, because there aren't any active threads
279  * manipulating the database yet.
280  */
281 void open_databases(void)
282 {
283         int ret;
284         int i;
285         char dbfilename[SIZ];
286         u_int32_t flags = 0;
287         char dbdirname[PATH_MAX];
288         DIR *dp;
289         struct dirent *d;
290         char filename[PATH_MAX];
291
292
293         getcwd(dbdirname, sizeof dbdirname);
294         strcat(dbdirname, "/data");
295
296         lprintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
297         lprintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
298         lprintf(CTDL_INFO, "  Linked db: %s\n", db_version(NULL, NULL, NULL));
299 #ifdef HAVE_ZLIB
300         lprintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
301 #endif
302
303         /*
304          * Silently try to create the database subdirectory.  If it's
305          * already there, no problem.
306          */
307         mkdir(dbdirname, 0700);
308         chmod(dbdirname, 0700);
309         chown(dbdirname, CTDLUID, (-1) );
310
311         lprintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
312         db_env_set_func_yield(sched_yield);
313         ret = db_env_create(&dbenv, 0);
314         if (ret) {
315                 lprintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n", db_strerror(ret));
316                 exit(ret);
317         }
318         dbenv->set_errpfx(dbenv, "citserver");
319         dbenv->set_paniccall(dbenv, dbpanic);
320
321         /*
322          * We want to specify the shared memory buffer pool cachesize,
323          * but everything else is the default.
324          */
325         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
326         if (ret) {
327                 lprintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
328                 dbenv->close(dbenv, 0);
329                 exit(ret);
330         }
331
332         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
333                 lprintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
334                 dbenv->close(dbenv, 0);
335                 exit(ret);
336         }
337
338         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
339                 DB_INIT_LOCK|DB_THREAD;
340         lprintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", dbdirname, flags);
341         ret = dbenv->open(dbenv, dbdirname, flags, 0);
342         if (ret) {
343                 lprintf(CTDL_DEBUG, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
344                 dbenv->close(dbenv, 0);
345                 exit(ret);
346         }
347
348         lprintf(CTDL_INFO, "cdb_*: Starting up DB\n");
349
350         for (i = 0; i < MAXCDB; ++i) {
351
352                 /* Create a database handle */
353                 ret = db_create(&dbp[i], dbenv, 0);
354                 if (ret) {
355                         lprintf(CTDL_DEBUG, "cdb_*: db_create: %s\n", db_strerror(ret));
356                         exit(ret);
357                 }
358
359
360                 /* Arbitrary names for our tables -- we reference them by
361                  * number, so we don't have string names for them.
362                  */
363                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
364
365                 ret = dbp[i]->open(dbp[i],
366                                 NULL,
367                                 dbfilename,
368                                 NULL,
369                                 DB_BTREE,
370                                 DB_CREATE|DB_AUTO_COMMIT|DB_THREAD
371                                 ,
372                                 0600);
373                 if (ret) {
374                         lprintf(CTDL_EMERG, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
375                         exit(ret);
376                 }
377         }
378
379         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
380                 lprintf(CTDL_EMERG, "cdb_*: pthread_key_create: %s\n", strerror(ret));
381                 exit(1);
382         }
383
384         cdb_allocate_tsd();
385         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
386
387         /* Now make sure we own all the files, because in a few milliseconds
388          * we're going to drop root privs.
389          */
390         dp = opendir(dbdirname);
391         if (dp != NULL) {
392                 while (d = readdir(dp), d != NULL) {
393                         if (d->d_name[0] != '.') {
394                                 snprintf(filename, sizeof filename, "%s/%s",
395                                         dbdirname, d->d_name);
396                                 chmod(filename, 0600);
397                                 chown(filename, CTDLUID, (-1) );
398                         }
399                 }
400                 closedir(dp);
401         }
402
403         lprintf(CTDL_DEBUG, "cdb_*: open_databases() finished\n");
404 }
405
406
407 /*
408  * Close all of the db database files we've opened.  This can be done
409  * in a loop, since it's just a bunch of closes.
410  */
411 void close_databases(void)
412 {
413         int a;
414         int ret;
415
416         cdb_free_tsd();
417
418         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
419                 lprintf(CTDL_EMERG,
420                         "cdb_*: txn_checkpoint: %s\n",
421                         db_strerror(ret));
422         }
423
424         for (a = 0; a < MAXCDB; ++a) {
425                 lprintf(CTDL_INFO, "cdb_*: Closing database %d\n", a);
426                 ret = dbp[a]->close(dbp[a], 0);
427                 if (ret) {
428                         lprintf(CTDL_EMERG,
429                                 "cdb_*: db_close: %s\n",
430                                 db_strerror(ret));
431                 }
432                 
433         }
434
435         /* Close the handle. */
436         ret = dbenv->close(dbenv, 0);
437         if (ret) {
438                 lprintf(CTDL_EMERG,
439                         "cdb_*: DBENV->close: %s\n",
440                         db_strerror(ret));
441         }
442 }
443
444
445 /*
446  * Compression functions only used if we have zlib
447  */
448 #ifdef HAVE_ZLIB
449
450 void cdb_decompress_if_necessary(struct cdbdata *cdb) {
451         static int magic = COMPRESS_MAGIC;
452         struct CtdlCompressHeader zheader;
453         char *uncompressed_data;
454         char *compressed_data;
455         uLongf destLen, sourceLen;
456
457         if (cdb == NULL) return;
458         if (cdb->ptr == NULL) return;
459         if (memcmp(cdb->ptr, &magic, sizeof(magic))) return;
460
461         /* At this point we know we're looking at a compressed item. */
462         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
463
464         compressed_data = cdb->ptr;
465         compressed_data += sizeof(struct CtdlCompressHeader);
466
467         sourceLen = (uLongf) zheader.compressed_len;
468         destLen = (uLongf) zheader.uncompressed_len;
469         uncompressed_data = malloc(zheader.uncompressed_len);
470
471         if (uncompress( (Bytef *) uncompressed_data,
472                         (uLongf *)&destLen,
473                         (const Bytef *)compressed_data,
474                         (uLong)sourceLen
475         ) != Z_OK) {
476                 lprintf(CTDL_EMERG, "uncompress() error\n");
477                 abort();
478         }
479
480         free(cdb->ptr);
481         cdb->len = (size_t) destLen;
482         cdb->ptr = uncompressed_data;
483 }
484
485 #endif  /* HAVE_ZLIB */
486         
487
488 /*
489  * Store a piece of data.  Returns 0 if the operation was successful.  If a
490  * key already exists it should be overwritten.
491  */
492 int cdb_store(int cdb,
493               void *ckey, int ckeylen,
494               void *cdata, int cdatalen)
495 {
496
497   DBT dkey, ddata;
498   DB_TXN *tid;
499   int ret;
500
501 #ifdef HAVE_ZLIB
502         struct CtdlCompressHeader zheader;
503         char *compressed_data = NULL;
504         int compressing = 0;
505         size_t buffer_len;
506         uLongf destLen;
507 #endif
508   
509   memset(&dkey, 0, sizeof(DBT));
510   memset(&ddata, 0, sizeof(DBT));
511   dkey.size = ckeylen;
512   dkey.data = ckey;
513   ddata.size = cdatalen;
514   ddata.data = cdata;
515
516 #ifdef HAVE_ZLIB
517         /* Only compress Visit records.  Everything else is uncompressed. */
518         if (cdb == CDB_VISIT) {
519                 compressing = 1;
520                 zheader.magic = COMPRESS_MAGIC;
521                 zheader.uncompressed_len = cdatalen;
522                 buffer_len = ( (cdatalen * 101) / 100 ) + 100
523                                 + sizeof(struct CtdlCompressHeader) ;
524                 destLen = (uLongf) buffer_len;
525                 compressed_data = malloc(buffer_len);
526                 if (compress2(
527                         (Bytef *) (compressed_data +
528                                         sizeof(struct CtdlCompressHeader)),
529                         &destLen,
530                         (Bytef *) cdata,
531                         (uLongf) cdatalen,
532                         1
533                 ) != Z_OK) {
534                         lprintf(CTDL_EMERG, "compress2() error\n");
535                         abort();
536                 }
537                 zheader.compressed_len = (size_t) destLen;
538                 memcpy(compressed_data, &zheader,
539                         sizeof(struct CtdlCompressHeader));
540                 ddata.size = (size_t)  (sizeof(struct CtdlCompressHeader) +
541                                                 zheader.compressed_len);
542                 ddata.data = compressed_data;
543         }
544 #endif
545   
546   if (MYTID != NULL)
547     {
548       ret = dbp[cdb]->put(dbp[cdb],             /* db */
549                           MYTID,                /* transaction ID */
550                           &dkey,                /* key */
551                           &ddata,               /* data */
552                           0);           /* flags */
553       if (ret)
554         {
555           lprintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
556           abort();
557         }
558 #ifdef HAVE_ZLIB
559       if (compressing) free(compressed_data);
560 #endif
561       return ret;
562       
563     }
564   else
565     {
566       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
567       
568     retry:
569       txbegin(&tid);
570       
571       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
572                                tid,         /* transaction ID */
573                                &dkey,       /* key */
574                                &ddata,      /* data */
575                                0)))         /* flags */
576         {
577           if (ret == DB_LOCK_DEADLOCK)
578             {
579               txabort(tid);
580               goto retry;
581             }
582           else
583             {
584               lprintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
585               abort();
586             }
587         }
588       else
589         {
590           txcommit(tid);
591 #ifdef HAVE_ZLIB
592           if (compressing) free(compressed_data);
593 #endif
594           return ret;
595         }
596     }
597 }
598
599
600 /*
601  * Delete a piece of data.  Returns 0 if the operation was successful.
602  */
603 int cdb_delete(int cdb, void *key, int keylen)
604 {
605
606   DBT dkey;
607   DB_TXN *tid;
608   int ret;
609
610   memset(&dkey, 0, sizeof dkey);
611   dkey.size = keylen;
612   dkey.data = key;
613
614   if (MYTID != NULL)
615     {
616       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
617       if (ret)
618         {
619           lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
620           if (ret != DB_NOTFOUND)
621             abort();
622         }
623     }
624   else
625     {
626       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
627     
628     retry:
629       txbegin(&tid);
630     
631       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
632           && ret != DB_NOTFOUND)
633         {
634           if (ret == DB_LOCK_DEADLOCK)
635             {
636               txabort(tid);
637               goto retry;
638             }
639           else
640             {
641               lprintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
642               abort();
643             }
644         }
645       else
646         {
647           txcommit(tid);
648         }
649     }
650   return ret;
651 }
652
653 static DBC *localcursor(int cdb)
654 {
655   int ret;
656   DBC *curs;
657
658   if (MYCURSORS[cdb] == NULL)
659     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
660   else
661     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
662
663   if (ret)
664     {
665       lprintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
666       abort();
667     }
668
669   return curs;
670 }
671
672
673 /*
674  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
675  * a struct cdbdata which it is the caller's responsibility to free later on
676  * using the cdb_free() routine.
677  */
678 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
679 {
680
681   struct cdbdata *tempcdb;
682   DBT dkey, dret;
683   int ret;
684
685   memset(&dkey, 0, sizeof(DBT));
686   dkey.size = keylen;
687   dkey.data = key;
688
689   if (MYTID != NULL)
690     {
691       memset(&dret, 0, sizeof(DBT));
692       dret.flags = DB_DBT_MALLOC;
693       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
694     }
695   else
696     {
697       DBC *curs;
698
699       do
700         {
701           memset(&dret, 0, sizeof(DBT));
702           dret.flags = DB_DBT_MALLOC;
703
704           curs = localcursor(cdb);
705
706           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
707           cclose(curs);
708         }
709       while (ret == DB_LOCK_DEADLOCK);
710
711     }
712
713   if ((ret != 0) && (ret != DB_NOTFOUND))
714     {
715       lprintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
716       abort();
717     }
718
719   if (ret != 0) return NULL;
720   tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
721
722   if (tempcdb == NULL)
723     {
724       lprintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
725       abort();
726     }
727
728   tempcdb->len = dret.size;
729   tempcdb->ptr = dret.data;
730 #ifdef HAVE_ZLIB
731   cdb_decompress_if_necessary(tempcdb);
732 #endif
733   return (tempcdb);
734 }
735
736
737 /*
738  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
739  * more complex stuff with other database managers in the future).
740  */
741 void cdb_free(struct cdbdata *cdb)
742 {
743         free(cdb->ptr);
744         free(cdb);
745 }
746
747 void cdb_close_cursor(int cdb)
748 {
749         if (MYCURSORS[cdb] != NULL)
750                 cclose(MYCURSORS[cdb]);
751
752         MYCURSORS[cdb] = NULL;
753 }
754
755 /* 
756  * Prepare for a sequential search of an entire database.
757  * (There is guaranteed to be no more than one traversal in
758  * progress per thread at any given time.)
759  */
760 void cdb_rewind(int cdb)
761 {
762         int ret = 0;
763
764         if (MYCURSORS[cdb] != NULL) {
765                 lprintf(CTDL_EMERG, "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
766                 abort();
767                 /* cclose(MYCURSORS[cdb]); */
768         }
769
770         /*
771          * Now initialize the cursor
772          */
773         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
774         if (ret) {
775                 lprintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
776                 abort();
777         }
778 }
779
780
781 /*
782  * Fetch the next item in a sequential search.  Returns a pointer to a 
783  * cdbdata structure, or NULL if we've hit the end.
784  */
785 struct cdbdata *cdb_next_item(int cdb)
786 {
787         DBT key, data;
788         struct cdbdata *cdbret;
789         int ret = 0;
790
791         /* Initialize the key/data pair so the flags aren't set. */
792         memset(&key, 0, sizeof(key));
793         memset(&data, 0, sizeof(data));
794         data.flags = DB_DBT_MALLOC;
795
796         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
797                 &key, &data, DB_NEXT);
798         
799         if (ret) {
800                 if (ret != DB_NOTFOUND) {
801                         lprintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
802                                 cdb, db_strerror(ret));
803                         abort();
804                 }
805                 cclose(MYCURSORS[cdb]);
806                 MYCURSORS[cdb] = NULL;
807                 return NULL;            /* presumably, end of file */
808         }
809
810         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
811         cdbret->len = data.size;
812         cdbret->ptr = data.data;
813 #ifdef HAVE_ZLIB
814         cdb_decompress_if_necessary(cdbret);
815 #endif
816
817         return (cdbret);
818 }
819
820
821
822 /*
823  * Transaction-based stuff.  I'm writing this as I bake cookies...
824  */
825
826 void cdb_begin_transaction(void) {
827
828   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
829
830   if (MYTID != NULL)
831     {
832       lprintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
833       abort();
834     }
835
836   txbegin(&MYTID);
837 }
838
839 void cdb_end_transaction(void) {
840   int i;
841
842   for (i = 0; i < MAXCDB; i++)
843     if (MYCURSORS[i] != NULL) {
844       lprintf(CTDL_WARNING, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
845       cclose(MYCURSORS[i]);
846       MYCURSORS[i] = NULL;
847     }
848
849   if (MYTID == NULL)
850     {
851       lprintf(CTDL_EMERG, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
852       abort();
853     }
854   else
855     txcommit(MYTID);
856
857   MYTID = NULL;
858 }
859
860 /*
861  * Truncate (delete every record)
862  */
863 void cdb_trunc(int cdb)
864 {
865   DB_TXN *tid;
866   int ret;
867   u_int32_t count;
868   
869   if (MYTID != NULL)
870     {
871       lprintf(CTDL_EMERG, "cdb_trunc must not be called in a transaction.\n");
872       abort();
873     }
874   else
875     {
876       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
877       
878     retry:
879       txbegin(&tid);
880       
881       if ((ret = dbp[cdb]->truncate(dbp[cdb],    /* db */
882                                     tid,         /* transaction ID */
883                                     &count,      /* #rows deleted */
884                                     0)))         /* flags */
885         {
886           if (ret == DB_LOCK_DEADLOCK)
887             {
888               txabort(tid);
889               goto retry;
890             }
891           else
892             {
893               lprintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
894               abort();
895             }
896         }
897       else
898         {
899           txcommit(tid);
900         }
901     }
902 }