* When we encounter a compressed database record on a citserver built without compres...
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
94                         db_strerror(ret));
95                 abort();
96         }
97 }
98
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->commit(tid, 0);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
108                         db_strerror(ret));
109                 abort();
110         }
111 }
112
113 /* are you sensing a pattern yet? */
114 static void txbegin(DB_TXN ** tid)
115 {
116         int ret;
117
118         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
119
120         if (ret) {
121                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
122                         db_strerror(ret));
123                 abort();
124         }
125 }
126
127 static void dbpanic(DB_ENV * env, int errval)
128 {
129         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
130 }
131
132 static void cclose(DBC * cursor)
133 {
134         int ret;
135
136         if ((ret = cursor->c_close(cursor))) {
137                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
138                         db_strerror(ret));
139                 abort();
140         }
141 }
142
143 static void bailIfCursor(DBC ** cursors, const char *msg)
144 {
145         int i;
146
147         for (i = 0; i < MAXCDB; i++)
148                 if (cursors[i] != NULL) {
149                         CtdlLogPrintf(CTDL_EMERG,
150                                 "cdb_*: cursor still in progress on cdb %02x: %s\n",
151                                 i, msg);
152                         abort();
153                 }
154 }
155
156 void check_handles(void *arg)
157 {
158         if (arg != NULL) {
159                 ThreadTSD *tsd = (ThreadTSD *) arg;
160
161                 bailIfCursor(tsd->cursors, "in check_handles");
162
163                 if (tsd->tid != NULL) {
164                         CtdlLogPrintf(CTDL_EMERG,
165                                 "cdb_*: transaction still in progress!");
166                         abort();
167                 }
168         }
169 }
170
171 void cdb_check_handles(void)
172 {
173         check_handles(pthread_getspecific(ThreadKey));
174 }
175
176
177 /*
178  * Cull the database logs
179  */
180 static void cdb_cull_logs(void)
181 {
182         u_int32_t flags;
183         int ret;
184         char **file, **list;
185         char errmsg[SIZ];
186
187         flags = DB_ARCH_ABS;
188
189         /* Get the list of names. */
190         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
191                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
192                 return;
193         }
194
195         /* Print the list of names. */
196         if (list != NULL) {
197                 for (file = list; *file != NULL; ++file) {
198                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
199                         ret = unlink(*file);
200                         if (ret != 0) {
201                                 snprintf(errmsg, sizeof(errmsg),
202                                          " ** ERROR **\n \n \n "
203                                          "Citadel was unable to delete the "
204                                          "database log file '%s' because of the "
205                                          "following error:\n \n %s\n \n"
206                                          " This log file is no longer in use "
207                                          "and may be safely deleted.\n",
208                                          *file, strerror(errno));
209                                 aide_message(errmsg, "Database Warning Message");
210                         }
211                 }
212                 free(list);
213         }
214 }
215
216 /*
217  * Manually initiate log file cull.
218  */
219 void cmd_cull(char *argbuf) {
220         if (CtdlAccessCheck(ac_internal)) return;
221         cdb_cull_logs();
222         cprintf("%d Database log file cull completed.\n", CIT_OK);
223 }
224
225
226 /*
227  * Request a checkpoint of the database.  Called once per minute by the thread manager.
228  */
229 void cdb_checkpoint(void)
230 {
231         int ret;
232
233         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
234         ret = dbenv->txn_checkpoint(dbenv,
235                                     MAX_CHECKPOINT_KBYTES,
236                                     MAX_CHECKPOINT_MINUTES, 0);
237
238         if (ret != 0) {
239                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
240                         db_strerror(ret));
241                 abort();
242         }
243
244         /* After a successful checkpoint, we can cull the unused logs */
245         if (config.c_auto_cull) {
246                 cdb_cull_logs();
247         }
248 }
249
250
251
252 /*
253  * Open the various databases we'll be using.  Any database which
254  * does not exist should be created.  Note that we don't need a
255  * critical section here, because there aren't any active threads
256  * manipulating the database yet.
257  */
258 void open_databases(void)
259 {
260         int ret;
261         int i;
262         char dbfilename[32];
263         u_int32_t flags = 0;
264         int dbversion_major, dbversion_minor, dbversion_patch;
265         int current_dbversion = 0;
266
267         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
268         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
269         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
270                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
271
272         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
273
274         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
275         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
276
277         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
278            && (CitControl.MMdbversion > current_dbversion) ) {
279                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
280                                         "of Berkeley DB that is older than that which last created or\n"
281                                         "updated the database.  Because this would probably cause data\n"
282                                         "corruption or loss, the server is aborting execution now.\n");
283                 exit(CTDLEXIT_DB);
284         }
285
286         CitControl.MMdbversion = current_dbversion;
287         put_control();
288
289 #ifdef HAVE_ZLIB
290         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
291 #endif
292
293         /*
294          * Silently try to create the database subdirectory.  If it's
295          * already there, no problem.
296          */
297         mkdir(ctdl_data_dir, 0700);
298         chmod(ctdl_data_dir, 0700);
299         chown(ctdl_data_dir, CTDLUID, (-1));
300
301         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
302         db_env_set_func_yield(sched_yield);
303         ret = db_env_create(&dbenv, 0);
304         if (ret) {
305                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n", db_strerror(ret));
306                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
307                 exit(CTDLEXIT_DB);
308         }
309         dbenv->set_errpfx(dbenv, "citserver");
310         dbenv->set_paniccall(dbenv, dbpanic);
311         dbenv->set_errcall(dbenv, cdb_verbose_err);
312         dbenv->set_errpfx(dbenv, "ctdl");
313 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
314         dbenv->set_msgcall(dbenv, cdb_verbose_log);
315 #endif
316         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
317         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
318
319         /*
320          * We want to specify the shared memory buffer pool cachesize,
321          * but everything else is the default.
322          */
323         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
324         if (ret) {
325                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
326                 dbenv->close(dbenv, 0);
327                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
328                 exit(CTDLEXIT_DB);
329         }
330
331         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
332                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
333                 dbenv->close(dbenv, 0);
334                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
335                 exit(CTDLEXIT_DB);
336         }
337
338         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
339         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
340         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
341         if (ret == DB_RUNRECOVERY) {
342                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
343                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
344                 flags |= DB_RECOVER;
345                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
346         }
347         if (ret == DB_RUNRECOVERY) {
348                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
349                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
350                 flags &= ~DB_RECOVER;
351                 flags |= DB_RECOVER_FATAL;
352                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
353         }
354         if (ret) {
355                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
356                 dbenv->close(dbenv, 0);
357                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
358                 exit(CTDLEXIT_DB);
359         }
360
361         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
362
363         for (i = 0; i < MAXCDB; ++i) {
364
365                 /* Create a database handle */
366                 ret = db_create(&dbp[i], dbenv, 0);
367                 if (ret) {
368                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
369                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
370                         exit(CTDLEXIT_DB);
371                 }
372
373
374                 /* Arbitrary names for our tables -- we reference them by
375                  * number, so we don't have string names for them.
376                  */
377                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
378
379                 ret = dbp[i]->open(dbp[i],
380                                    NULL,
381                                    dbfilename,
382                                    NULL,
383                                    DB_BTREE,
384                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
385                                    0600);
386                 if (ret) {
387                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
388                         if (ret == ENOMEM) {
389                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
390                         }
391                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
392                         exit(CTDLEXIT_DB);
393                 }
394         }
395
396 }
397
398
399 /* Make sure we own all the files, because in a few milliseconds
400  * we're going to drop root privs.
401  */
402 void cdb_chmod_data(void) {
403         DIR *dp;
404         struct dirent *d;
405         char filename[PATH_MAX];
406
407         dp = opendir(ctdl_data_dir);
408         if (dp != NULL) {
409                 while (d = readdir(dp), d != NULL) {
410                         if (d->d_name[0] != '.') {
411                                 snprintf(filename, sizeof filename,
412                                          "%s/%s", ctdl_data_dir, d->d_name);
413                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
414                                         filename, chmod(filename, 0600)
415                                 );
416                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
417                                         filename, chown(filename, CTDLUID, (-1))
418                                 );
419                         }
420                 }
421                 closedir(dp);
422         }
423
424         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
425
426         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
427 }
428
429
430 /*
431  * Close all of the db database files we've opened.  This can be done
432  * in a loop, since it's just a bunch of closes.
433  */
434 void close_databases(void)
435 {
436         int a;
437         int ret;
438
439         ctdl_thread_internal_free_tsd();
440         
441         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
442                 CtdlLogPrintf(CTDL_EMERG,
443                         "txn_checkpoint: %s\n", db_strerror(ret));
444         }
445
446         /* print some statistics... */
447 #ifdef DB_STAT_ALL
448         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
449 #endif
450
451         /* close the tables */
452         for (a = 0; a < MAXCDB; ++a) {
453                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
454                 ret = dbp[a]->close(dbp[a], 0);
455                 if (ret) {
456                         CtdlLogPrintf(CTDL_EMERG,
457                                 "db_close: %s\n", db_strerror(ret));
458                 }
459
460         }
461
462         /* Close the handle. */
463         ret = dbenv->close(dbenv, 0);
464         if (ret) {
465                 CtdlLogPrintf(CTDL_EMERG,
466                         "DBENV->close: %s\n", db_strerror(ret));
467         }
468 }
469
470
471 /*
472  * Compression functions only used if we have zlib
473  */
474 void cdb_decompress_if_necessary(struct cdbdata *cdb)
475 {
476         static int magic = COMPRESS_MAGIC;
477
478         if (cdb == NULL)
479                 return;
480         if (cdb->ptr == NULL)
481                 return;
482         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
483                 return;
484
485 #ifdef HAVE_ZLIB
486         /* At this point we know we're looking at a compressed item. */
487
488         struct CtdlCompressHeader zheader;
489         char *uncompressed_data;
490         char *compressed_data;
491         uLongf destLen, sourceLen;
492
493         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
494
495         compressed_data = cdb->ptr;
496         compressed_data += sizeof(struct CtdlCompressHeader);
497
498         sourceLen = (uLongf) zheader.compressed_len;
499         destLen = (uLongf) zheader.uncompressed_len;
500         uncompressed_data = malloc(zheader.uncompressed_len);
501
502         if (uncompress((Bytef *) uncompressed_data,
503                        (uLongf *) & destLen,
504                        (const Bytef *) compressed_data,
505                        (uLong) sourceLen) != Z_OK) {
506                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
507                 abort();
508         }
509
510         free(cdb->ptr);
511         cdb->len = (size_t) destLen;
512         cdb->ptr = uncompressed_data;
513 #else                           /* HAVE_ZLIB */
514         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
515         abort();
516 #endif                          /* HAVE_ZLIB */
517 }
518
519
520
521 /*
522  * Store a piece of data.  Returns 0 if the operation was successful.  If a
523  * key already exists it should be overwritten.
524  */
525 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
526 {
527
528         DBT dkey, ddata;
529         DB_TXN *tid;
530         int ret = 0;
531
532 #ifdef HAVE_ZLIB
533         struct CtdlCompressHeader zheader;
534         char *compressed_data = NULL;
535         int compressing = 0;
536         size_t buffer_len = 0;
537         uLongf destLen = 0;
538 #endif
539
540         memset(&dkey, 0, sizeof(DBT));
541         memset(&ddata, 0, sizeof(DBT));
542         dkey.size = ckeylen;
543         dkey.data = ckey;
544         ddata.size = cdatalen;
545         ddata.data = cdata;
546
547 #ifdef HAVE_ZLIB
548         /* Only compress Visit records.  Everything else is uncompressed. */
549         if (cdb == CDB_VISIT) {
550                 compressing = 1;
551                 zheader.magic = COMPRESS_MAGIC;
552                 zheader.uncompressed_len = cdatalen;
553                 buffer_len = ((cdatalen * 101) / 100) + 100
554                     + sizeof(struct CtdlCompressHeader);
555                 destLen = (uLongf) buffer_len;
556                 compressed_data = malloc(buffer_len);
557                 if (compress2((Bytef *) (compressed_data +
558                                          sizeof(struct
559                                                 CtdlCompressHeader)),
560                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
561                               1) != Z_OK) {
562                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
563                         abort();
564                 }
565                 zheader.compressed_len = (size_t) destLen;
566                 memcpy(compressed_data, &zheader,
567                        sizeof(struct CtdlCompressHeader));
568                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
569                                        zheader.compressed_len);
570                 ddata.data = compressed_data;
571         }
572 #endif
573
574         if (MYTID != NULL) {
575                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
576                                     MYTID,      /* transaction ID */
577                                     &dkey,      /* key */
578                                     &ddata,     /* data */
579                                     0); /* flags */
580                 if (ret) {
581                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
582                                 db_strerror(ret));
583                         abort();
584                 }
585 #ifdef HAVE_ZLIB
586                 if (compressing)
587                         free(compressed_data);
588 #endif
589                 return ret;
590
591         } else {
592                 bailIfCursor(MYCURSORS,
593                              "attempt to write during r/o cursor");
594
595               retry:
596                 txbegin(&tid);
597
598                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
599                                          tid,   /* transaction ID */
600                                          &dkey, /* key */
601                                          &ddata,        /* data */
602                                          0))) { /* flags */
603                         if (ret == DB_LOCK_DEADLOCK) {
604                                 txabort(tid);
605                                 goto retry;
606                         } else {
607                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
608                                         cdb, db_strerror(ret));
609                                 abort();
610                         }
611                 } else {
612                         txcommit(tid);
613 #ifdef HAVE_ZLIB
614                         if (compressing)
615                                 free(compressed_data);
616 #endif
617                         return ret;
618                 }
619         }
620 }
621
622
623 /*
624  * Delete a piece of data.  Returns 0 if the operation was successful.
625  */
626 int cdb_delete(int cdb, void *key, int keylen)
627 {
628
629         DBT dkey;
630         DB_TXN *tid;
631         int ret;
632
633         memset(&dkey, 0, sizeof dkey);
634         dkey.size = keylen;
635         dkey.data = key;
636
637         if (MYTID != NULL) {
638                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
639                 if (ret) {
640                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
641                                 db_strerror(ret));
642                         if (ret != DB_NOTFOUND)
643                                 abort();
644                 }
645         } else {
646                 bailIfCursor(MYCURSORS,
647                              "attempt to delete during r/o cursor");
648
649               retry:
650                 txbegin(&tid);
651
652                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
653                     && ret != DB_NOTFOUND) {
654                         if (ret == DB_LOCK_DEADLOCK) {
655                                 txabort(tid);
656                                 goto retry;
657                         } else {
658                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
659                                         cdb, db_strerror(ret));
660                                 abort();
661                         }
662                 } else {
663                         txcommit(tid);
664                 }
665         }
666         return ret;
667 }
668
669 static DBC *localcursor(int cdb)
670 {
671         int ret;
672         DBC *curs;
673
674         if (MYCURSORS[cdb] == NULL)
675                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
676         else
677                 ret =
678                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
679                                           DB_POSITION);
680
681         if (ret) {
682                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
683                 abort();
684         }
685
686         return curs;
687 }
688
689
690 /*
691  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
692  * a struct cdbdata which it is the caller's responsibility to free later on
693  * using the cdb_free() routine.
694  */
695 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
696 {
697
698         struct cdbdata *tempcdb;
699         DBT dkey, dret;
700         int ret;
701
702         memset(&dkey, 0, sizeof(DBT));
703         dkey.size = keylen;
704         dkey.data = key;
705
706         if (MYTID != NULL) {
707                 memset(&dret, 0, sizeof(DBT));
708                 dret.flags = DB_DBT_MALLOC;
709                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
710         } else {
711                 DBC *curs;
712
713                 do {
714                         memset(&dret, 0, sizeof(DBT));
715                         dret.flags = DB_DBT_MALLOC;
716
717                         curs = localcursor(cdb);
718
719                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
720                         cclose(curs);
721                 }
722                 while (ret == DB_LOCK_DEADLOCK);
723
724         }
725
726         if ((ret != 0) && (ret != DB_NOTFOUND)) {
727                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
728                         db_strerror(ret));
729                 abort();
730         }
731
732         if (ret != 0)
733                 return NULL;
734         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
735
736         if (tempcdb == NULL) {
737                 CtdlLogPrintf(CTDL_EMERG,
738                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
739                 abort();
740         }
741
742         tempcdb->len = dret.size;
743         tempcdb->ptr = dret.data;
744         cdb_decompress_if_necessary(tempcdb);
745         return (tempcdb);
746 }
747
748
749 /*
750  * Free a cdbdata item.
751  *
752  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
753  * other code to assume ownership of that memory simply by storing the
754  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
755  * avoid freeing it.
756  */
757 void cdb_free(struct cdbdata *cdb)
758 {
759         if (cdb->ptr) {
760                 free(cdb->ptr);
761         }
762         free(cdb);
763 }
764
765 void cdb_close_cursor(int cdb)
766 {
767         if (MYCURSORS[cdb] != NULL)
768                 cclose(MYCURSORS[cdb]);
769
770         MYCURSORS[cdb] = NULL;
771 }
772
773 /* 
774  * Prepare for a sequential search of an entire database.
775  * (There is guaranteed to be no more than one traversal in
776  * progress per thread at any given time.)
777  */
778 void cdb_rewind(int cdb)
779 {
780         int ret = 0;
781
782         if (MYCURSORS[cdb] != NULL) {
783                 CtdlLogPrintf(CTDL_EMERG,
784                         "cdb_rewind: must close cursor on database %d before reopening.\n",
785                         cdb);
786                 abort();
787                 /* cclose(MYCURSORS[cdb]); */
788         }
789
790         /*
791          * Now initialize the cursor
792          */
793         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
794         if (ret) {
795                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
796                         db_strerror(ret));
797                 abort();
798         }
799 }
800
801
802 /*
803  * Fetch the next item in a sequential search.  Returns a pointer to a 
804  * cdbdata structure, or NULL if we've hit the end.
805  */
806 struct cdbdata *cdb_next_item(int cdb)
807 {
808         DBT key, data;
809         struct cdbdata *cdbret;
810         int ret = 0;
811
812         /* Initialize the key/data pair so the flags aren't set. */
813         memset(&key, 0, sizeof(key));
814         memset(&data, 0, sizeof(data));
815         data.flags = DB_DBT_MALLOC;
816
817         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
818
819         if (ret) {
820                 if (ret != DB_NOTFOUND) {
821                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
822                                 cdb, db_strerror(ret));
823                         abort();
824                 }
825                 cclose(MYCURSORS[cdb]);
826                 MYCURSORS[cdb] = NULL;
827                 return NULL;    /* presumably, end of file */
828         }
829
830         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
831         cdbret->len = data.size;
832         cdbret->ptr = data.data;
833         cdb_decompress_if_necessary(cdbret);
834
835         return (cdbret);
836 }
837
838
839
840 /*
841  * Transaction-based stuff.  I'm writing this as I bake cookies...
842  */
843
844 void cdb_begin_transaction(void)
845 {
846
847         bailIfCursor(MYCURSORS,
848                      "can't begin transaction during r/o cursor");
849
850         if (MYTID != NULL) {
851                 CtdlLogPrintf(CTDL_EMERG,
852                         "cdb_begin_transaction: ERROR: nested transaction\n");
853                 abort();
854         }
855
856         txbegin(&MYTID);
857 }
858
859 void cdb_end_transaction(void)
860 {
861         int i;
862
863         for (i = 0; i < MAXCDB; i++)
864                 if (MYCURSORS[i] != NULL) {
865                         CtdlLogPrintf(CTDL_WARNING,
866                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
867                                 i);
868                         cclose(MYCURSORS[i]);
869                         MYCURSORS[i] = NULL;
870                 }
871
872         if (MYTID == NULL) {
873                 CtdlLogPrintf(CTDL_EMERG,
874                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
875                 abort();
876         } else
877                 txcommit(MYTID);
878
879         MYTID = NULL;
880 }
881
882 /*
883  * Truncate (delete every record)
884  */
885 void cdb_trunc(int cdb)
886 {
887         /* DB_TXN *tid; */
888         int ret;
889         u_int32_t count;
890
891         if (MYTID != NULL) {
892                 CtdlLogPrintf(CTDL_EMERG,
893                         "cdb_trunc must not be called in a transaction.\n");
894                 abort();
895         } else {
896                 bailIfCursor(MYCURSORS,
897                              "attempt to write during r/o cursor");
898
899               retry:
900                 /* txbegin(&tid); */
901
902                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
903                                               NULL,     /* transaction ID */
904                                               &count,   /* #rows deleted */
905                                               0))) {    /* flags */
906                         if (ret == DB_LOCK_DEADLOCK) {
907                                 /* txabort(tid); */
908                                 goto retry;
909                         } else {
910                                 CtdlLogPrintf(CTDL_EMERG,
911                                         "cdb_truncate(%d): %s\n", cdb,
912                                         db_strerror(ret));
913                                 abort();
914                         }
915                 } else {
916                         /* txcommit(tid); */
917                 }
918         }
919 }