* be more accurate, len needs to be at least sizeof(magic)
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  * Copyright (c) 1987-2009 by the citadel.org team
7  *
8  *  This program is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation; either version 3 of the License, or
11  *  (at your option) any later version.
12  *
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 /*****************************************************************************
25        Tunable configuration parameters for the Berkeley DB back end
26  *****************************************************************************/
27
28 /* Citadel will checkpoint the db at the end of every session, but only if
29  * the specified number of kilobytes has been written, or if the specified
30  * number of minutes has passed, since the last checkpoint.
31  */
32 #define MAX_CHECKPOINT_KBYTES   256
33 #define MAX_CHECKPOINT_MINUTES  15
34
35 /*****************************************************************************/
36
37 #include "sysdep.h"
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <ctype.h>
42 #include <string.h>
43 #include <errno.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <dirent.h>
47
48 #ifdef HAVE_DB_H
49 #include <db.h>
50 #elif defined(HAVE_DB4_DB_H)
51 #include <db4/db.h>
52 #else
53 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
54 #endif
55
56
57 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
58 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
59 #endif
60
61
62 #include <libcitadel.h>
63 #include "citadel.h"
64 #include "server.h"
65 #include "citserver.h"
66 #include "database.h"
67 #include "msgbase.h"
68 #include "sysdep_decls.h"
69 #include "threads.h"
70 #include "config.h"
71 #include "control.h"
72
73 #include "ctdl_module.h"
74
75
76 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
77 static DB_ENV *dbenv;           /* The DB environment (global) */
78
79
80 #ifdef HAVE_ZLIB
81 #include <zlib.h>
82 #endif
83
84
85 /* Verbose logging callback */
86 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
87 {
88         if (!IsEmptyStr(msg)) {
89                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
90         }
91 }
92
93
94 /* Verbose logging callback */
95 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
96 {
97         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
98 }
99
100
101 /* just a little helper function */
102 static void txabort(DB_TXN * tid)
103 {
104         int ret;
105
106         ret = tid->abort(tid);
107
108         if (ret) {
109                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
110                 abort();
111         }
112 }
113
114 /* this one is even more helpful than the last. */
115 static void txcommit(DB_TXN * tid)
116 {
117         int ret;
118
119         ret = tid->commit(tid, 0);
120
121         if (ret) {
122                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
123                 abort();
124         }
125 }
126
127 /* are you sensing a pattern yet? */
128 static void txbegin(DB_TXN ** tid)
129 {
130         int ret;
131
132         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
133
134         if (ret) {
135                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void dbpanic(DB_ENV * env, int errval)
141 {
142         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
143 }
144
145 static void cclose(DBC * cursor)
146 {
147         int ret;
148
149         if ((ret = cursor->c_close(cursor))) {
150                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
151                 abort();
152         }
153 }
154
155 static void bailIfCursor(DBC ** cursors, const char *msg)
156 {
157         int i;
158
159         for (i = 0; i < MAXCDB; i++)
160                 if (cursors[i] != NULL) {
161                         CtdlLogPrintf(CTDL_EMERG,
162                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
163                         abort();
164                 }
165 }
166
167 void check_handles(void *arg)
168 {
169         if (arg != NULL) {
170                 ThreadTSD *tsd = (ThreadTSD *) arg;
171
172                 bailIfCursor(tsd->cursors, "in check_handles");
173
174                 if (tsd->tid != NULL) {
175                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
176                         abort();
177                 }
178         }
179 }
180
181 void cdb_check_handles(void)
182 {
183         check_handles(pthread_getspecific(ThreadKey));
184 }
185
186
187 /*
188  * Cull the database logs
189  */
190 static void cdb_cull_logs(void)
191 {
192         u_int32_t flags;
193         int ret;
194         char **file, **list;
195         char errmsg[SIZ];
196
197         flags = DB_ARCH_ABS;
198
199         /* Get the list of names. */
200         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
201                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
202                 return;
203         }
204
205         /* Print the list of names. */
206         if (list != NULL) {
207                 for (file = list; *file != NULL; ++file) {
208                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
209                         ret = unlink(*file);
210                         if (ret != 0) {
211                                 snprintf(errmsg, sizeof(errmsg),
212                                          " ** ERROR **\n \n \n "
213                                          "Citadel was unable to delete the "
214                                          "database log file '%s' because of the "
215                                          "following error:\n \n %s\n \n"
216                                          " This log file is no longer in use "
217                                          "and may be safely deleted.\n",
218                                          *file, strerror(errno));
219                                 CtdlAideMessage(errmsg, "Database Warning Message");
220                         }
221                 }
222                 free(list);
223         }
224 }
225
226 /*
227  * Manually initiate log file cull.
228  */
229 void cmd_cull(char *argbuf) {
230         if (CtdlAccessCheck(ac_internal)) return;
231         cdb_cull_logs();
232         cprintf("%d Database log file cull completed.\n", CIT_OK);
233 }
234
235
236 /*
237  * Request a checkpoint of the database.  Called once per minute by the thread manager.
238  */
239 void cdb_checkpoint(void)
240 {
241         int ret;
242
243         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
244         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
245
246         if (ret != 0) {
247                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
248                 abort();
249         }
250
251         /* After a successful checkpoint, we can cull the unused logs */
252         if (config.c_auto_cull) {
253                 cdb_cull_logs();
254         }
255 }
256
257
258
259 /*
260  * Open the various databases we'll be using.  Any database which
261  * does not exist should be created.  Note that we don't need a
262  * critical section here, because there aren't any active threads
263  * manipulating the database yet.
264  */
265 void open_databases(void)
266 {
267         int ret;
268         int i;
269         char dbfilename[32];
270         u_int32_t flags = 0;
271         int dbversion_major, dbversion_minor, dbversion_patch;
272         int current_dbversion = 0;
273
274         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
275         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
276         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
277                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
278
279         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
280
281         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
282         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
283
284         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
285            && (CitControl.MMdbversion > current_dbversion) ) {
286                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
287                                         "of Berkeley DB that is older than that which last created or\n"
288                                         "updated the database.  Because this would probably cause data\n"
289                                         "corruption or loss, the server is aborting execution now.\n");
290                 exit(CTDLEXIT_DB);
291         }
292
293         CitControl.MMdbversion = current_dbversion;
294         put_control();
295
296 #ifdef HAVE_ZLIB
297         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
298 #endif
299
300         /*
301          * Silently try to create the database subdirectory.  If it's
302          * already there, no problem.
303          */
304         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
305                 CtdlLogPrintf(CTDL_EMERG, 
306                               "unable to create database directory [%s]: %s", 
307                               ctdl_data_dir, strerror(errno));
308         }
309         if (chmod(ctdl_data_dir, 0700) != 0){
310                 CtdlLogPrintf(CTDL_EMERG, 
311                               "unable to set database directory accessrights [%s]: %s", 
312                               ctdl_data_dir, strerror(errno));
313         }
314         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
315                 CtdlLogPrintf(CTDL_EMERG, 
316                               "unable to set the owner for [%s]: %s", 
317                               ctdl_data_dir, strerror(errno));
318         }
319         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
320         db_env_set_func_yield(sched_yield);
321         ret = db_env_create(&dbenv, 0);
322         if (ret) {
323                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
324                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
325                 exit(CTDLEXIT_DB);
326         }
327         dbenv->set_errpfx(dbenv, "citserver");
328         dbenv->set_paniccall(dbenv, dbpanic);
329         dbenv->set_errcall(dbenv, cdb_verbose_err);
330         dbenv->set_errpfx(dbenv, "ctdl");
331 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
332         dbenv->set_msgcall(dbenv, cdb_verbose_log);
333 #endif
334         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
335         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
336
337         /*
338          * We want to specify the shared memory buffer pool cachesize,
339          * but everything else is the default.
340          */
341         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
342         if (ret) {
343                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
344                 dbenv->close(dbenv, 0);
345                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
346                 exit(CTDLEXIT_DB);
347         }
348
349         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
350                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
351                 dbenv->close(dbenv, 0);
352                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
353                 exit(CTDLEXIT_DB);
354         }
355
356         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
357         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
358         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
359         if (ret == DB_RUNRECOVERY) {
360                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
361                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
362                 flags |= DB_RECOVER;
363                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
364         }
365         if (ret == DB_RUNRECOVERY) {
366                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
367                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
368                 flags &= ~DB_RECOVER;
369                 flags |= DB_RECOVER_FATAL;
370                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
371         }
372         if (ret) {
373                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
374                 dbenv->close(dbenv, 0);
375                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
376                 exit(CTDLEXIT_DB);
377         }
378
379         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
380
381         for (i = 0; i < MAXCDB; ++i) {
382
383                 /* Create a database handle */
384                 ret = db_create(&dbp[i], dbenv, 0);
385                 if (ret) {
386                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
387                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
388                         exit(CTDLEXIT_DB);
389                 }
390
391
392                 /* Arbitrary names for our tables -- we reference them by
393                  * number, so we don't have string names for them.
394                  */
395                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
396
397                 ret = dbp[i]->open(dbp[i],
398                                    NULL,
399                                    dbfilename,
400                                    NULL,
401                                    DB_BTREE,
402                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
403                                    0600
404                 );
405                 if (ret) {
406                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
407                         if (ret == ENOMEM) {
408                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
409                         }
410                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
411                         exit(CTDLEXIT_DB);
412                 }
413         }
414
415 }
416
417
418 /* Make sure we own all the files, because in a few milliseconds
419  * we're going to drop root privs.
420  */
421 void cdb_chmod_data(void) {
422         DIR *dp;
423         struct dirent *d;
424         char filename[PATH_MAX];
425
426         dp = opendir(ctdl_data_dir);
427         if (dp != NULL) {
428                 while (d = readdir(dp), d != NULL) {
429                         if (d->d_name[0] != '.') {
430                                 snprintf(filename, sizeof filename,
431                                          "%s/%s", ctdl_data_dir, d->d_name);
432                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
433                                         filename, chmod(filename, 0600)
434                                 );
435                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
436                                         filename, chown(filename, CTDLUID, (-1))
437                                 );
438                         }
439                 }
440                 closedir(dp);
441         }
442
443         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
444         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
445 }
446
447
448 /*
449  * Close all of the db database files we've opened.  This can be done
450  * in a loop, since it's just a bunch of closes.
451  */
452 void close_databases(void)
453 {
454         int a;
455         int ret;
456
457         ctdl_thread_internal_free_tsd();
458         
459         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
460                 CtdlLogPrintf(CTDL_EMERG,
461                         "txn_checkpoint: %s\n", db_strerror(ret));
462         }
463
464         /* print some statistics... */
465 #ifdef DB_STAT_ALL
466         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
467 #endif
468
469         /* close the tables */
470         for (a = 0; a < MAXCDB; ++a) {
471                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
472                 ret = dbp[a]->close(dbp[a], 0);
473                 if (ret) {
474                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
475                 }
476
477         }
478
479         /* Close the handle. */
480         ret = dbenv->close(dbenv, 0);
481         if (ret) {
482                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
483         }
484 }
485
486
487 /*
488  * Compression functions only used if we have zlib
489  */
490 void cdb_decompress_if_necessary(struct cdbdata *cdb)
491 {
492         static int magic = COMPRESS_MAGIC;
493
494         if ((cdb == NULL) || 
495             (cdb->ptr == NULL) || 
496             (cdb->len < sizeof(magic)) ||
497             (memcmp(cdb->ptr, &magic, sizeof(magic))))
498             return;
499
500 #ifdef HAVE_ZLIB
501         /* At this point we know we're looking at a compressed item. */
502
503         struct CtdlCompressHeader zheader;
504         char *uncompressed_data;
505         char *compressed_data;
506         uLongf destLen, sourceLen;
507
508         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
509
510         compressed_data = cdb->ptr;
511         compressed_data += sizeof(struct CtdlCompressHeader);
512
513         sourceLen = (uLongf) zheader.compressed_len;
514         destLen = (uLongf) zheader.uncompressed_len;
515         uncompressed_data = malloc(zheader.uncompressed_len);
516
517         if (uncompress((Bytef *) uncompressed_data,
518                        (uLongf *) & destLen,
519                        (const Bytef *) compressed_data,
520                        (uLong) sourceLen) != Z_OK) {
521                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
522                 abort();
523         }
524
525         free(cdb->ptr);
526         cdb->len = (size_t) destLen;
527         cdb->ptr = uncompressed_data;
528 #else                           /* HAVE_ZLIB */
529         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
530         abort();
531 #endif                          /* HAVE_ZLIB */
532 }
533
534
535
536 /*
537  * Store a piece of data.  Returns 0 if the operation was successful.  If a
538  * key already exists it should be overwritten.
539  */
540 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
541 {
542
543         DBT dkey, ddata;
544         DB_TXN *tid;
545         int ret = 0;
546
547 #ifdef HAVE_ZLIB
548         struct CtdlCompressHeader zheader;
549         char *compressed_data = NULL;
550         int compressing = 0;
551         size_t buffer_len = 0;
552         uLongf destLen = 0;
553 #endif
554
555         memset(&dkey, 0, sizeof(DBT));
556         memset(&ddata, 0, sizeof(DBT));
557         dkey.size = ckeylen;
558         dkey.data = ckey;
559         ddata.size = cdatalen;
560         ddata.data = cdata;
561
562 #ifdef HAVE_ZLIB
563         /* Only compress Visit records.  Everything else is uncompressed. */
564         if (cdb == CDB_VISIT) {
565                 compressing = 1;
566                 zheader.magic = COMPRESS_MAGIC;
567                 zheader.uncompressed_len = cdatalen;
568                 buffer_len = ((cdatalen * 101) / 100) + 100
569                     + sizeof(struct CtdlCompressHeader);
570                 destLen = (uLongf) buffer_len;
571                 compressed_data = malloc(buffer_len);
572                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
573                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
574                 {
575                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
576                         abort();
577                 }
578                 zheader.compressed_len = (size_t) destLen;
579                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
580                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
581                 ddata.data = compressed_data;
582         }
583 #endif
584
585         if (MYTID != NULL) {
586                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
587                                     MYTID,      /* transaction ID */
588                                     &dkey,      /* key */
589                                     &ddata,     /* data */
590                                     0); /* flags */
591                 if (ret) {
592                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
593                         abort();
594                 }
595 #ifdef HAVE_ZLIB
596                 if (compressing)
597                         free(compressed_data);
598 #endif
599                 return ret;
600
601         } else {
602                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
603
604               retry:
605                 txbegin(&tid);
606
607                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
608                                          tid,   /* transaction ID */
609                                          &dkey, /* key */
610                                          &ddata,        /* data */
611                                          0))) { /* flags */
612                         if (ret == DB_LOCK_DEADLOCK) {
613                                 txabort(tid);
614                                 goto retry;
615                         } else {
616                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
617                                         cdb, db_strerror(ret));
618                                 abort();
619                         }
620                 } else {
621                         txcommit(tid);
622 #ifdef HAVE_ZLIB
623                         if (compressing) {
624                                 free(compressed_data);
625                         }
626 #endif
627                         return ret;
628                 }
629         }
630 }
631
632
633 /*
634  * Delete a piece of data.  Returns 0 if the operation was successful.
635  */
636 int cdb_delete(int cdb, void *key, int keylen)
637 {
638
639         DBT dkey;
640         DB_TXN *tid;
641         int ret;
642
643         memset(&dkey, 0, sizeof dkey);
644         dkey.size = keylen;
645         dkey.data = key;
646
647         if (MYTID != NULL) {
648                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
649                 if (ret) {
650                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
651                         if (ret != DB_NOTFOUND) {
652                                 abort();
653                         }
654                 }
655         } else {
656                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
657
658               retry:
659                 txbegin(&tid);
660
661                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
662                     && ret != DB_NOTFOUND) {
663                         if (ret == DB_LOCK_DEADLOCK) {
664                                 txabort(tid);
665                                 goto retry;
666                         } else {
667                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
668                                         cdb, db_strerror(ret));
669                                 abort();
670                         }
671                 } else {
672                         txcommit(tid);
673                 }
674         }
675         return ret;
676 }
677
678 static DBC *localcursor(int cdb)
679 {
680         int ret;
681         DBC *curs;
682
683         if (MYCURSORS[cdb] == NULL)
684                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
685         else
686                 ret =
687                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
688                                           DB_POSITION);
689
690         if (ret) {
691                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
692                 abort();
693         }
694
695         return curs;
696 }
697
698
699 /*
700  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
701  * a struct cdbdata which it is the caller's responsibility to free later on
702  * using the cdb_free() routine.
703  */
704 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
705 {
706
707         struct cdbdata *tempcdb;
708         DBT dkey, dret;
709         int ret;
710
711         memset(&dkey, 0, sizeof(DBT));
712         dkey.size = keylen;
713         dkey.data = key;
714
715         if (MYTID != NULL) {
716                 memset(&dret, 0, sizeof(DBT));
717                 dret.flags = DB_DBT_MALLOC;
718                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
719         } else {
720                 DBC *curs;
721
722                 do {
723                         memset(&dret, 0, sizeof(DBT));
724                         dret.flags = DB_DBT_MALLOC;
725
726                         curs = localcursor(cdb);
727
728                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
729                         cclose(curs);
730                 }
731                 while (ret == DB_LOCK_DEADLOCK);
732
733         }
734
735         if ((ret != 0) && (ret != DB_NOTFOUND)) {
736                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
737                 abort();
738         }
739
740         if (ret != 0)
741                 return NULL;
742         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
743
744         if (tempcdb == NULL) {
745                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
746                 abort();
747         }
748
749         tempcdb->len = dret.size;
750         tempcdb->ptr = dret.data;
751         cdb_decompress_if_necessary(tempcdb);
752         return (tempcdb);
753 }
754
755
756 /*
757  * Free a cdbdata item.
758  *
759  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
760  * other code to assume ownership of that memory simply by storing the
761  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
762  * avoid freeing it.
763  */
764 void cdb_free(struct cdbdata *cdb)
765 {
766         if (cdb->ptr) {
767                 free(cdb->ptr);
768         }
769         free(cdb);
770 }
771
772 void cdb_close_cursor(int cdb)
773 {
774         if (MYCURSORS[cdb] != NULL) {
775                 cclose(MYCURSORS[cdb]);
776         }
777
778         MYCURSORS[cdb] = NULL;
779 }
780
781 /* 
782  * Prepare for a sequential search of an entire database.
783  * (There is guaranteed to be no more than one traversal in
784  * progress per thread at any given time.)
785  */
786 void cdb_rewind(int cdb)
787 {
788         int ret = 0;
789
790         if (MYCURSORS[cdb] != NULL) {
791                 CtdlLogPrintf(CTDL_EMERG,
792                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
793                 abort();
794                 /* cclose(MYCURSORS[cdb]); */
795         }
796
797         /*
798          * Now initialize the cursor
799          */
800         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
801         if (ret) {
802                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
803                 abort();
804         }
805 }
806
807
808 /*
809  * Fetch the next item in a sequential search.  Returns a pointer to a 
810  * cdbdata structure, or NULL if we've hit the end.
811  */
812 struct cdbdata *cdb_next_item(int cdb)
813 {
814         DBT key, data;
815         struct cdbdata *cdbret;
816         int ret = 0;
817
818         /* Initialize the key/data pair so the flags aren't set. */
819         memset(&key, 0, sizeof(key));
820         memset(&data, 0, sizeof(data));
821         data.flags = DB_DBT_MALLOC;
822
823         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
824
825         if (ret) {
826                 if (ret != DB_NOTFOUND) {
827                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
828                         abort();
829                 }
830                 cclose(MYCURSORS[cdb]);
831                 MYCURSORS[cdb] = NULL;
832                 return NULL;    /* presumably, end of file */
833         }
834
835         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
836         cdbret->len = data.size;
837         cdbret->ptr = data.data;
838         cdb_decompress_if_necessary(cdbret);
839
840         return (cdbret);
841 }
842
843
844
845 /*
846  * Transaction-based stuff.  I'm writing this as I bake cookies...
847  */
848
849 void cdb_begin_transaction(void)
850 {
851
852         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
853
854         if (MYTID != NULL) {
855                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
856                 abort();
857         }
858
859         txbegin(&MYTID);
860 }
861
862 void cdb_end_transaction(void)
863 {
864         int i;
865
866         for (i = 0; i < MAXCDB; i++)
867                 if (MYCURSORS[i] != NULL) {
868                         CtdlLogPrintf(CTDL_WARNING,
869                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
870                                 i);
871                         cclose(MYCURSORS[i]);
872                         MYCURSORS[i] = NULL;
873                 }
874
875         if (MYTID == NULL) {
876                 CtdlLogPrintf(CTDL_EMERG,
877                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
878                 abort();
879         } else {
880                 txcommit(MYTID);
881         }
882
883         MYTID = NULL;
884 }
885
886 /*
887  * Truncate (delete every record)
888  */
889 void cdb_trunc(int cdb)
890 {
891         /* DB_TXN *tid; */
892         int ret;
893         u_int32_t count;
894
895         if (MYTID != NULL) {
896                 CtdlLogPrintf(CTDL_EMERG,
897                         "cdb_trunc must not be called in a transaction.\n");
898                 abort();
899         } else {
900                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
901
902               retry:
903                 /* txbegin(&tid); */
904
905                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
906                                               NULL,     /* transaction ID */
907                                               &count,   /* #rows deleted */
908                                               0))) {    /* flags */
909                         if (ret == DB_LOCK_DEADLOCK) {
910                                 /* txabort(tid); */
911                                 goto retry;
912                         } else {
913                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
914                                 if (ret == ENOMEM) {
915                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
916                                 }
917                                 abort();
918                         }
919                 } else {
920                         /* txcommit(tid); */
921                 }
922         }
923 }