Began removing $Id$ tags. This will be an ongoing process.
[citadel.git] / citadel / database.c
1 /*
2  * This is a data store backend for the Citadel server which uses Berkeley DB.
3  *
4  * Copyright (c) 1987-2009 by the citadel.org team
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21
22 /*****************************************************************************
23        Tunable configuration parameters for the Berkeley DB back end
24  *****************************************************************************/
25
26 /* Citadel will checkpoint the db at the end of every session, but only if
27  * the specified number of kilobytes has been written, or if the specified
28  * number of minutes has passed, since the last checkpoint.
29  */
30 #define MAX_CHECKPOINT_KBYTES   256
31 #define MAX_CHECKPOINT_MINUTES  15
32
33 /*****************************************************************************/
34
35 #include "sysdep.h"
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <ctype.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <dirent.h>
45
46 #ifdef HAVE_DB_H
47 #include <db.h>
48 #elif defined(HAVE_DB4_DB_H)
49 #include <db4/db.h>
50 #else
51 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
52 #endif
53
54
55 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
56 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
57 #endif
58
59
60 #include <libcitadel.h>
61 #include "citadel.h"
62 #include "server.h"
63 #include "citserver.h"
64 #include "database.h"
65 #include "msgbase.h"
66 #include "sysdep_decls.h"
67 #include "threads.h"
68 #include "config.h"
69 #include "control.h"
70
71 #include "ctdl_module.h"
72
73
74 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
75 static DB_ENV *dbenv;           /* The DB environment (global) */
76
77
78 #ifdef HAVE_ZLIB
79 #include <zlib.h>
80 #endif
81
82
83 /* Verbose logging callback */
84 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
85 {
86         if (!IsEmptyStr(msg)) {
87                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
88         }
89 }
90
91
92 /* Verbose logging callback */
93 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
94 {
95         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
96 }
97
98
99 /* just a little helper function */
100 static void txabort(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->abort(tid);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
108                 abort();
109         }
110 }
111
112 /* this one is even more helpful than the last. */
113 static void txcommit(DB_TXN * tid)
114 {
115         int ret;
116
117         ret = tid->commit(tid, 0);
118
119         if (ret) {
120                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
121                 abort();
122         }
123 }
124
125 /* are you sensing a pattern yet? */
126 static void txbegin(DB_TXN ** tid)
127 {
128         int ret;
129
130         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
131
132         if (ret) {
133                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
134                 abort();
135         }
136 }
137
138 static void dbpanic(DB_ENV * env, int errval)
139 {
140         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
141 }
142
143 static void cclose(DBC * cursor)
144 {
145         int ret;
146
147         if ((ret = cursor->c_close(cursor))) {
148                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
149                 abort();
150         }
151 }
152
153 static void bailIfCursor(DBC ** cursors, const char *msg)
154 {
155         int i;
156
157         for (i = 0; i < MAXCDB; i++)
158                 if (cursors[i] != NULL) {
159                         CtdlLogPrintf(CTDL_EMERG,
160                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
161                         abort();
162                 }
163 }
164
165 void check_handles(void *arg)
166 {
167         if (arg != NULL) {
168                 ThreadTSD *tsd = (ThreadTSD *) arg;
169
170                 bailIfCursor(tsd->cursors, "in check_handles");
171
172                 if (tsd->tid != NULL) {
173                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
174                         abort();
175                 }
176         }
177 }
178
179 void cdb_check_handles(void)
180 {
181         check_handles(pthread_getspecific(ThreadKey));
182 }
183
184
185 /*
186  * Cull the database logs
187  */
188 static void cdb_cull_logs(void)
189 {
190         u_int32_t flags;
191         int ret;
192         char **file, **list;
193         char errmsg[SIZ];
194
195         flags = DB_ARCH_ABS;
196
197         /* Get the list of names. */
198         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
199                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
200                 return;
201         }
202
203         /* Print the list of names. */
204         if (list != NULL) {
205                 for (file = list; *file != NULL; ++file) {
206                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
207                         ret = unlink(*file);
208                         if (ret != 0) {
209                                 snprintf(errmsg, sizeof(errmsg),
210                                          " ** ERROR **\n \n \n "
211                                          "Citadel was unable to delete the "
212                                          "database log file '%s' because of the "
213                                          "following error:\n \n %s\n \n"
214                                          " This log file is no longer in use "
215                                          "and may be safely deleted.\n",
216                                          *file, strerror(errno));
217                                 CtdlAideMessage(errmsg, "Database Warning Message");
218                         }
219                 }
220                 free(list);
221         }
222 }
223
224 /*
225  * Manually initiate log file cull.
226  */
227 void cmd_cull(char *argbuf) {
228         if (CtdlAccessCheck(ac_internal)) return;
229         cdb_cull_logs();
230         cprintf("%d Database log file cull completed.\n", CIT_OK);
231 }
232
233
234 /*
235  * Request a checkpoint of the database.  Called once per minute by the thread manager.
236  */
237 void cdb_checkpoint(void)
238 {
239         int ret;
240
241         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
242         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
243
244         if (ret != 0) {
245                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
246                 abort();
247         }
248
249         /* After a successful checkpoint, we can cull the unused logs */
250         if (config.c_auto_cull) {
251                 cdb_cull_logs();
252         }
253 }
254
255
256
257 /*
258  * Open the various databases we'll be using.  Any database which
259  * does not exist should be created.  Note that we don't need a
260  * critical section here, because there aren't any active threads
261  * manipulating the database yet.
262  */
263 void open_databases(void)
264 {
265         int ret;
266         int i;
267         char dbfilename[32];
268         u_int32_t flags = 0;
269         int dbversion_major, dbversion_minor, dbversion_patch;
270         int current_dbversion = 0;
271
272         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
273         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
274         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
275                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
276
277         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
278
279         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
280         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
281
282         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
283            && (CitControl.MMdbversion > current_dbversion) ) {
284                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
285                                         "of Berkeley DB that is older than that which last created or\n"
286                                         "updated the database.  Because this would probably cause data\n"
287                                         "corruption or loss, the server is aborting execution now.\n");
288                 exit(CTDLEXIT_DB);
289         }
290
291         CitControl.MMdbversion = current_dbversion;
292         put_control();
293
294 #ifdef HAVE_ZLIB
295         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
296 #endif
297
298         /*
299          * Silently try to create the database subdirectory.  If it's
300          * already there, no problem.
301          */
302         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
303                 CtdlLogPrintf(CTDL_EMERG, 
304                               "unable to create database directory [%s]: %s", 
305                               ctdl_data_dir, strerror(errno));
306         }
307         if (chmod(ctdl_data_dir, 0700) != 0){
308                 CtdlLogPrintf(CTDL_EMERG, 
309                               "unable to set database directory accessrights [%s]: %s", 
310                               ctdl_data_dir, strerror(errno));
311         }
312         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
313                 CtdlLogPrintf(CTDL_EMERG, 
314                               "unable to set the owner for [%s]: %s", 
315                               ctdl_data_dir, strerror(errno));
316         }
317         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
318         db_env_set_func_yield((int (*)(u_long,  u_long))sched_yield);
319         ret = db_env_create(&dbenv, 0);
320         if (ret) {
321                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
322                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
323                 exit(CTDLEXIT_DB);
324         }
325         dbenv->set_errpfx(dbenv, "citserver");
326         dbenv->set_paniccall(dbenv, dbpanic);
327         dbenv->set_errcall(dbenv, cdb_verbose_err);
328         dbenv->set_errpfx(dbenv, "ctdl");
329 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
330         dbenv->set_msgcall(dbenv, cdb_verbose_log);
331 #endif
332         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
333         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
334
335         /*
336          * We want to specify the shared memory buffer pool cachesize,
337          * but everything else is the default.
338          */
339         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
340         if (ret) {
341                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
342                 dbenv->close(dbenv, 0);
343                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
344                 exit(CTDLEXIT_DB);
345         }
346
347         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
348                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
349                 dbenv->close(dbenv, 0);
350                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
351                 exit(CTDLEXIT_DB);
352         }
353
354         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
355         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
356         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
357         if (ret == DB_RUNRECOVERY) {
358                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
359                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
360                 flags |= DB_RECOVER;
361                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
362         }
363         if (ret == DB_RUNRECOVERY) {
364                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
365                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
366                 flags &= ~DB_RECOVER;
367                 flags |= DB_RECOVER_FATAL;
368                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
369         }
370         if (ret) {
371                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
372                 dbenv->close(dbenv, 0);
373                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
374                 exit(CTDLEXIT_DB);
375         }
376
377         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
378
379         for (i = 0; i < MAXCDB; ++i) {
380
381                 /* Create a database handle */
382                 ret = db_create(&dbp[i], dbenv, 0);
383                 if (ret) {
384                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
385                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
386                         exit(CTDLEXIT_DB);
387                 }
388
389
390                 /* Arbitrary names for our tables -- we reference them by
391                  * number, so we don't have string names for them.
392                  */
393                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
394
395                 ret = dbp[i]->open(dbp[i],
396                                    NULL,
397                                    dbfilename,
398                                    NULL,
399                                    DB_BTREE,
400                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
401                                    0600
402                 );
403                 if (ret) {
404                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
405                         if (ret == ENOMEM) {
406                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
407                         }
408                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
409                         exit(CTDLEXIT_DB);
410                 }
411         }
412
413 }
414
415
416 /* Make sure we own all the files, because in a few milliseconds
417  * we're going to drop root privs.
418  */
419 void cdb_chmod_data(void) {
420         DIR *dp;
421         struct dirent *d;
422         char filename[PATH_MAX];
423
424         dp = opendir(ctdl_data_dir);
425         if (dp != NULL) {
426                 while (d = readdir(dp), d != NULL) {
427                         if (d->d_name[0] != '.') {
428                                 snprintf(filename, sizeof filename,
429                                          "%s/%s", ctdl_data_dir, d->d_name);
430                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
431                                         filename, chmod(filename, 0600)
432                                 );
433                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
434                                         filename, chown(filename, CTDLUID, (-1))
435                                 );
436                         }
437                 }
438                 closedir(dp);
439         }
440
441         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
442         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
443 }
444
445
446 /*
447  * Close all of the db database files we've opened.  This can be done
448  * in a loop, since it's just a bunch of closes.
449  */
450 void close_databases(void)
451 {
452         int a;
453         int ret;
454
455         ctdl_thread_internal_free_tsd();
456         
457         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
458                 CtdlLogPrintf(CTDL_EMERG,
459                         "txn_checkpoint: %s\n", db_strerror(ret));
460         }
461
462         /* print some statistics... */
463 #ifdef DB_STAT_ALL
464         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
465 #endif
466
467         /* close the tables */
468         for (a = 0; a < MAXCDB; ++a) {
469                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
470                 ret = dbp[a]->close(dbp[a], 0);
471                 if (ret) {
472                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
473                 }
474
475         }
476
477         /* Close the handle. */
478         ret = dbenv->close(dbenv, 0);
479         if (ret) {
480                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
481         }
482 }
483
484
485 /*
486  * Compression functions only used if we have zlib
487  */
488 void cdb_decompress_if_necessary(struct cdbdata *cdb)
489 {
490         static int magic = COMPRESS_MAGIC;
491
492         if ((cdb == NULL) || 
493             (cdb->ptr == NULL) || 
494             (cdb->len < sizeof(magic)) ||
495             (memcmp(cdb->ptr, &magic, sizeof(magic))))
496             return;
497
498 #ifdef HAVE_ZLIB
499         /* At this point we know we're looking at a compressed item. */
500
501         struct CtdlCompressHeader zheader;
502         char *uncompressed_data;
503         char *compressed_data;
504         uLongf destLen, sourceLen;
505         size_t cplen;
506
507         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
508         cplen = sizeof(struct CtdlCompressHeader);
509         if (sizeof(struct CtdlCompressHeader) > cdb->len)
510                 cplen = cdb->len;
511         memcpy(&zheader, cdb->ptr, cplen);
512
513         compressed_data = cdb->ptr;
514         compressed_data += sizeof(struct CtdlCompressHeader);
515
516         sourceLen = (uLongf) zheader.compressed_len;
517         destLen = (uLongf) zheader.uncompressed_len;
518         uncompressed_data = malloc(zheader.uncompressed_len);
519
520         if (uncompress((Bytef *) uncompressed_data,
521                        (uLongf *) & destLen,
522                        (const Bytef *) compressed_data,
523                        (uLong) sourceLen) != Z_OK) {
524                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
525                 abort();
526         }
527
528         free(cdb->ptr);
529         cdb->len = (size_t) destLen;
530         cdb->ptr = uncompressed_data;
531 #else                           /* HAVE_ZLIB */
532         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
533         abort();
534 #endif                          /* HAVE_ZLIB */
535 }
536
537
538
539 /*
540  * Store a piece of data.  Returns 0 if the operation was successful.  If a
541  * key already exists it should be overwritten.
542  */
543 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
544 {
545
546         DBT dkey, ddata;
547         DB_TXN *tid;
548         int ret = 0;
549
550 #ifdef HAVE_ZLIB
551         struct CtdlCompressHeader zheader;
552         char *compressed_data = NULL;
553         int compressing = 0;
554         size_t buffer_len = 0;
555         uLongf destLen = 0;
556 #endif
557
558         memset(&dkey, 0, sizeof(DBT));
559         memset(&ddata, 0, sizeof(DBT));
560         dkey.size = ckeylen;
561         dkey.data = ckey;
562         ddata.size = cdatalen;
563         ddata.data = cdata;
564
565 #ifdef HAVE_ZLIB
566         /* Only compress Visit records.  Everything else is uncompressed. */
567         if (cdb == CDB_VISIT) {
568                 compressing = 1;
569                 zheader.magic = COMPRESS_MAGIC;
570                 zheader.uncompressed_len = cdatalen;
571                 buffer_len = ((cdatalen * 101) / 100) + 100
572                     + sizeof(struct CtdlCompressHeader);
573                 destLen = (uLongf) buffer_len;
574                 compressed_data = malloc(buffer_len);
575                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
576                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
577                 {
578                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
579                         abort();
580                 }
581                 zheader.compressed_len = (size_t) destLen;
582                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
583                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
584                 ddata.data = compressed_data;
585         }
586 #endif
587
588         if (MYTID != NULL) {
589                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
590                                     MYTID,      /* transaction ID */
591                                     &dkey,      /* key */
592                                     &ddata,     /* data */
593                                     0); /* flags */
594                 if (ret) {
595                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
596                         abort();
597                 }
598 #ifdef HAVE_ZLIB
599                 if (compressing)
600                         free(compressed_data);
601 #endif
602                 return ret;
603
604         } else {
605                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
606
607               retry:
608                 txbegin(&tid);
609
610                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
611                                          tid,   /* transaction ID */
612                                          &dkey, /* key */
613                                          &ddata,        /* data */
614                                          0))) { /* flags */
615                         if (ret == DB_LOCK_DEADLOCK) {
616                                 txabort(tid);
617                                 goto retry;
618                         } else {
619                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
620                                         cdb, db_strerror(ret));
621                                 abort();
622                         }
623                 } else {
624                         txcommit(tid);
625 #ifdef HAVE_ZLIB
626                         if (compressing) {
627                                 free(compressed_data);
628                         }
629 #endif
630                         return ret;
631                 }
632         }
633 }
634
635
636 /*
637  * Delete a piece of data.  Returns 0 if the operation was successful.
638  */
639 int cdb_delete(int cdb, void *key, int keylen)
640 {
641
642         DBT dkey;
643         DB_TXN *tid;
644         int ret;
645
646         memset(&dkey, 0, sizeof dkey);
647         dkey.size = keylen;
648         dkey.data = key;
649
650         if (MYTID != NULL) {
651                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
652                 if (ret) {
653                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
654                         if (ret != DB_NOTFOUND) {
655                                 abort();
656                         }
657                 }
658         } else {
659                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
660
661               retry:
662                 txbegin(&tid);
663
664                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
665                     && ret != DB_NOTFOUND) {
666                         if (ret == DB_LOCK_DEADLOCK) {
667                                 txabort(tid);
668                                 goto retry;
669                         } else {
670                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
671                                         cdb, db_strerror(ret));
672                                 abort();
673                         }
674                 } else {
675                         txcommit(tid);
676                 }
677         }
678         return ret;
679 }
680
681 static DBC *localcursor(int cdb)
682 {
683         int ret;
684         DBC *curs;
685
686         if (MYCURSORS[cdb] == NULL)
687                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
688         else
689                 ret =
690                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
691                                           DB_POSITION);
692
693         if (ret) {
694                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
695                 abort();
696         }
697
698         return curs;
699 }
700
701
702 /*
703  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
704  * a struct cdbdata which it is the caller's responsibility to free later on
705  * using the cdb_free() routine.
706  */
707 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen)
708 {
709
710         struct cdbdata *tempcdb;
711         DBT dkey, dret;
712         int ret;
713
714         memset(&dkey, 0, sizeof(DBT));
715         dkey.size = keylen;
716         dkey.data = key;
717
718         if (MYTID != NULL) {
719                 memset(&dret, 0, sizeof(DBT));
720                 dret.flags = DB_DBT_MALLOC;
721                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
722         } else {
723                 DBC *curs;
724
725                 do {
726                         memset(&dret, 0, sizeof(DBT));
727                         dret.flags = DB_DBT_MALLOC;
728
729                         curs = localcursor(cdb);
730
731                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
732                         cclose(curs);
733                 }
734                 while (ret == DB_LOCK_DEADLOCK);
735
736         }
737
738         if ((ret != 0) && (ret != DB_NOTFOUND)) {
739                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
740                 abort();
741         }
742
743         if (ret != 0)
744                 return NULL;
745         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
746
747         if (tempcdb == NULL) {
748                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
749                 abort();
750         }
751
752         tempcdb->len = dret.size;
753         tempcdb->ptr = dret.data;
754         cdb_decompress_if_necessary(tempcdb);
755         return (tempcdb);
756 }
757
758
759 /*
760  * Free a cdbdata item.
761  *
762  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
763  * other code to assume ownership of that memory simply by storing the
764  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
765  * avoid freeing it.
766  */
767 void cdb_free(struct cdbdata *cdb)
768 {
769         if (cdb->ptr) {
770                 free(cdb->ptr);
771         }
772         free(cdb);
773 }
774
775 void cdb_close_cursor(int cdb)
776 {
777         if (MYCURSORS[cdb] != NULL) {
778                 cclose(MYCURSORS[cdb]);
779         }
780
781         MYCURSORS[cdb] = NULL;
782 }
783
784 /* 
785  * Prepare for a sequential search of an entire database.
786  * (There is guaranteed to be no more than one traversal in
787  * progress per thread at any given time.)
788  */
789 void cdb_rewind(int cdb)
790 {
791         int ret = 0;
792
793         if (MYCURSORS[cdb] != NULL) {
794                 CtdlLogPrintf(CTDL_EMERG,
795                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
796                 abort();
797                 /* cclose(MYCURSORS[cdb]); */
798         }
799
800         /*
801          * Now initialize the cursor
802          */
803         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
804         if (ret) {
805                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
806                 abort();
807         }
808 }
809
810
811 /*
812  * Fetch the next item in a sequential search.  Returns a pointer to a 
813  * cdbdata structure, or NULL if we've hit the end.
814  */
815 struct cdbdata *cdb_next_item(int cdb)
816 {
817         DBT key, data;
818         struct cdbdata *cdbret;
819         int ret = 0;
820
821         /* Initialize the key/data pair so the flags aren't set. */
822         memset(&key, 0, sizeof(key));
823         memset(&data, 0, sizeof(data));
824         data.flags = DB_DBT_MALLOC;
825
826         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
827
828         if (ret) {
829                 if (ret != DB_NOTFOUND) {
830                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
831                         abort();
832                 }
833                 cclose(MYCURSORS[cdb]);
834                 MYCURSORS[cdb] = NULL;
835                 return NULL;    /* presumably, end of file */
836         }
837
838         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
839         cdbret->len = data.size;
840         cdbret->ptr = data.data;
841         cdb_decompress_if_necessary(cdbret);
842
843         return (cdbret);
844 }
845
846
847
848 /*
849  * Transaction-based stuff.  I'm writing this as I bake cookies...
850  */
851
852 void cdb_begin_transaction(void)
853 {
854
855         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
856
857         if (MYTID != NULL) {
858                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
859                 abort();
860         }
861
862         txbegin(&MYTID);
863 }
864
865 void cdb_end_transaction(void)
866 {
867         int i;
868
869         for (i = 0; i < MAXCDB; i++)
870                 if (MYCURSORS[i] != NULL) {
871                         CtdlLogPrintf(CTDL_WARNING,
872                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
873                                 i);
874                         cclose(MYCURSORS[i]);
875                         MYCURSORS[i] = NULL;
876                 }
877
878         if (MYTID == NULL) {
879                 CtdlLogPrintf(CTDL_EMERG,
880                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
881                 abort();
882         } else {
883                 txcommit(MYTID);
884         }
885
886         MYTID = NULL;
887 }
888
889 /*
890  * Truncate (delete every record)
891  */
892 void cdb_trunc(int cdb)
893 {
894         /* DB_TXN *tid; */
895         int ret;
896         u_int32_t count;
897
898         if (MYTID != NULL) {
899                 CtdlLogPrintf(CTDL_EMERG,
900                         "cdb_trunc must not be called in a transaction.\n");
901                 abort();
902         } else {
903                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
904
905               retry:
906                 /* txbegin(&tid); */
907
908                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
909                                               NULL,     /* transaction ID */
910                                               &count,   /* #rows deleted */
911                                               0))) {    /* flags */
912                         if (ret == DB_LOCK_DEADLOCK) {
913                                 /* txabort(tid); */
914                                 goto retry;
915                         } else {
916                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
917                                 if (ret == ENOMEM) {
918                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
919                                 }
920                                 abort();
921                         }
922                 } else {
923                         /* txcommit(tid); */
924                 }
925         }
926 }