95b4f9a60a104044232e8138558e80d10b825789
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  * Copyright (c) 1987-2009 by the citadel.org team
7  *
8  *  This program is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation; either version 3 of the License, or
11  *  (at your option) any later version.
12  *
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 /*****************************************************************************
25        Tunable configuration parameters for the Berkeley DB back end
26  *****************************************************************************/
27
28 /* Citadel will checkpoint the db at the end of every session, but only if
29  * the specified number of kilobytes has been written, or if the specified
30  * number of minutes has passed, since the last checkpoint.
31  */
32 #define MAX_CHECKPOINT_KBYTES   256
33 #define MAX_CHECKPOINT_MINUTES  15
34
35 /*****************************************************************************/
36
37 #include "sysdep.h"
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <ctype.h>
42 #include <string.h>
43 #include <errno.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <dirent.h>
47
48 #ifdef HAVE_DB_H
49 #include <db.h>
50 #elif defined(HAVE_DB4_DB_H)
51 #include <db4/db.h>
52 #else
53 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
54 #endif
55
56
57 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
58 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
59 #endif
60
61
62 #include <libcitadel.h>
63 #include "citadel.h"
64 #include "server.h"
65 #include "citserver.h"
66 #include "database.h"
67 #include "msgbase.h"
68 #include "sysdep_decls.h"
69 #include "threads.h"
70 #include "config.h"
71 #include "control.h"
72
73 #include "ctdl_module.h"
74
75
76 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
77 static DB_ENV *dbenv;           /* The DB environment (global) */
78
79
80 #ifdef HAVE_ZLIB
81 #include <zlib.h>
82 #endif
83
84
85 /* Verbose logging callback */
86 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
87 {
88         if (!IsEmptyStr(msg)) {
89                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
90         }
91 }
92
93
94 /* Verbose logging callback */
95 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
96 {
97         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
98 }
99
100
101 /* just a little helper function */
102 static void txabort(DB_TXN * tid)
103 {
104         int ret;
105
106         ret = tid->abort(tid);
107
108         if (ret) {
109                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
110                 abort();
111         }
112 }
113
114 /* this one is even more helpful than the last. */
115 static void txcommit(DB_TXN * tid)
116 {
117         int ret;
118
119         ret = tid->commit(tid, 0);
120
121         if (ret) {
122                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
123                 abort();
124         }
125 }
126
127 /* are you sensing a pattern yet? */
128 static void txbegin(DB_TXN ** tid)
129 {
130         int ret;
131
132         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
133
134         if (ret) {
135                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void dbpanic(DB_ENV * env, int errval)
141 {
142         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
143 }
144
145 static void cclose(DBC * cursor)
146 {
147         int ret;
148
149         if ((ret = cursor->c_close(cursor))) {
150                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
151                 abort();
152         }
153 }
154
155 static void bailIfCursor(DBC ** cursors, const char *msg)
156 {
157         int i;
158
159         for (i = 0; i < MAXCDB; i++)
160                 if (cursors[i] != NULL) {
161                         CtdlLogPrintf(CTDL_EMERG,
162                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
163                         abort();
164                 }
165 }
166
167 void check_handles(void *arg)
168 {
169         if (arg != NULL) {
170                 ThreadTSD *tsd = (ThreadTSD *) arg;
171
172                 bailIfCursor(tsd->cursors, "in check_handles");
173
174                 if (tsd->tid != NULL) {
175                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
176                         abort();
177                 }
178         }
179 }
180
181 void cdb_check_handles(void)
182 {
183         check_handles(pthread_getspecific(ThreadKey));
184 }
185
186
187 /*
188  * Cull the database logs
189  */
190 static void cdb_cull_logs(void)
191 {
192         u_int32_t flags;
193         int ret;
194         char **file, **list;
195         char errmsg[SIZ];
196
197         flags = DB_ARCH_ABS;
198
199         /* Get the list of names. */
200         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
201                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
202                 return;
203         }
204
205         /* Print the list of names. */
206         if (list != NULL) {
207                 for (file = list; *file != NULL; ++file) {
208                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
209                         ret = unlink(*file);
210                         if (ret != 0) {
211                                 snprintf(errmsg, sizeof(errmsg),
212                                          " ** ERROR **\n \n \n "
213                                          "Citadel was unable to delete the "
214                                          "database log file '%s' because of the "
215                                          "following error:\n \n %s\n \n"
216                                          " This log file is no longer in use "
217                                          "and may be safely deleted.\n",
218                                          *file, strerror(errno));
219                                 CtdlAideMessage(errmsg, "Database Warning Message");
220                         }
221                 }
222                 free(list);
223         }
224 }
225
226 /*
227  * Manually initiate log file cull.
228  */
229 void cmd_cull(char *argbuf) {
230         if (CtdlAccessCheck(ac_internal)) return;
231         cdb_cull_logs();
232         cprintf("%d Database log file cull completed.\n", CIT_OK);
233 }
234
235
236 /*
237  * Request a checkpoint of the database.  Called once per minute by the thread manager.
238  */
239 void cdb_checkpoint(void)
240 {
241         int ret;
242
243         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
244         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
245
246         if (ret != 0) {
247                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
248                 abort();
249         }
250
251         /* After a successful checkpoint, we can cull the unused logs */
252         if (config.c_auto_cull) {
253                 cdb_cull_logs();
254         }
255 }
256
257
258
259 /*
260  * Open the various databases we'll be using.  Any database which
261  * does not exist should be created.  Note that we don't need a
262  * critical section here, because there aren't any active threads
263  * manipulating the database yet.
264  */
265 void open_databases(void)
266 {
267         int ret;
268         int i;
269         char dbfilename[32];
270         u_int32_t flags = 0;
271         int dbversion_major, dbversion_minor, dbversion_patch;
272         int current_dbversion = 0;
273
274         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
275         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
276         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
277                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
278
279         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
280
281         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
282         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
283
284         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
285            && (CitControl.MMdbversion > current_dbversion) ) {
286                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
287                                         "of Berkeley DB that is older than that which last created or\n"
288                                         "updated the database.  Because this would probably cause data\n"
289                                         "corruption or loss, the server is aborting execution now.\n");
290                 exit(CTDLEXIT_DB);
291         }
292
293         CitControl.MMdbversion = current_dbversion;
294         put_control();
295
296 #ifdef HAVE_ZLIB
297         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
298 #endif
299
300         /*
301          * Silently try to create the database subdirectory.  If it's
302          * already there, no problem.
303          */
304         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
305                 CtdlLogPrintf(CTDL_EMERG, 
306                               "unable to create database directory [%s]: %s", 
307                               ctdl_data_dir, strerror(errno));
308         }
309         if (chmod(ctdl_data_dir, 0700) != 0){
310                 CtdlLogPrintf(CTDL_EMERG, 
311                               "unable to set database directory accessrights [%s]: %s", 
312                               ctdl_data_dir, strerror(errno));
313         }
314         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
315                 CtdlLogPrintf(CTDL_EMERG, 
316                               "unable to set the owner for [%s]: %s", 
317                               ctdl_data_dir, strerror(errno));
318         }
319         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
320         db_env_set_func_yield((int (*)(u_long,  u_long))sched_yield);
321         ret = db_env_create(&dbenv, 0);
322         if (ret) {
323                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
324                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
325                 exit(CTDLEXIT_DB);
326         }
327         dbenv->set_errpfx(dbenv, "citserver");
328         dbenv->set_paniccall(dbenv, dbpanic);
329         dbenv->set_errcall(dbenv, cdb_verbose_err);
330         dbenv->set_errpfx(dbenv, "ctdl");
331 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
332         dbenv->set_msgcall(dbenv, cdb_verbose_log);
333 #endif
334         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
335         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
336
337         /*
338          * We want to specify the shared memory buffer pool cachesize,
339          * but everything else is the default.
340          */
341         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
342         if (ret) {
343                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
344                 dbenv->close(dbenv, 0);
345                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
346                 exit(CTDLEXIT_DB);
347         }
348
349         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
350                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
351                 dbenv->close(dbenv, 0);
352                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
353                 exit(CTDLEXIT_DB);
354         }
355
356         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
357         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
358         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
359         if (ret == DB_RUNRECOVERY) {
360                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
361                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
362                 flags |= DB_RECOVER;
363                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
364         }
365         if (ret == DB_RUNRECOVERY) {
366                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
367                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
368                 flags &= ~DB_RECOVER;
369                 flags |= DB_RECOVER_FATAL;
370                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
371         }
372         if (ret) {
373                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
374                 dbenv->close(dbenv, 0);
375                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
376                 exit(CTDLEXIT_DB);
377         }
378
379         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
380
381         for (i = 0; i < MAXCDB; ++i) {
382
383                 /* Create a database handle */
384                 ret = db_create(&dbp[i], dbenv, 0);
385                 if (ret) {
386                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
387                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
388                         exit(CTDLEXIT_DB);
389                 }
390
391
392                 /* Arbitrary names for our tables -- we reference them by
393                  * number, so we don't have string names for them.
394                  */
395                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
396
397                 ret = dbp[i]->open(dbp[i],
398                                    NULL,
399                                    dbfilename,
400                                    NULL,
401                                    DB_BTREE,
402                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
403                                    0600
404                 );
405                 if (ret) {
406                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
407                         if (ret == ENOMEM) {
408                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
409                         }
410                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
411                         exit(CTDLEXIT_DB);
412                 }
413         }
414
415 }
416
417
418 /* Make sure we own all the files, because in a few milliseconds
419  * we're going to drop root privs.
420  */
421 void cdb_chmod_data(void) {
422         DIR *dp;
423         struct dirent *d;
424         char filename[PATH_MAX];
425
426         dp = opendir(ctdl_data_dir);
427         if (dp != NULL) {
428                 while (d = readdir(dp), d != NULL) {
429                         if (d->d_name[0] != '.') {
430                                 snprintf(filename, sizeof filename,
431                                          "%s/%s", ctdl_data_dir, d->d_name);
432                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
433                                         filename, chmod(filename, 0600)
434                                 );
435                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
436                                         filename, chown(filename, CTDLUID, (-1))
437                                 );
438                         }
439                 }
440                 closedir(dp);
441         }
442
443         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
444         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
445 }
446
447
448 /*
449  * Close all of the db database files we've opened.  This can be done
450  * in a loop, since it's just a bunch of closes.
451  */
452 void close_databases(void)
453 {
454         int a;
455         int ret;
456
457         ctdl_thread_internal_free_tsd();
458         
459         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
460                 CtdlLogPrintf(CTDL_EMERG,
461                         "txn_checkpoint: %s\n", db_strerror(ret));
462         }
463
464         /* print some statistics... */
465 #ifdef DB_STAT_ALL
466         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
467 #endif
468
469         /* close the tables */
470         for (a = 0; a < MAXCDB; ++a) {
471                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
472                 ret = dbp[a]->close(dbp[a], 0);
473                 if (ret) {
474                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
475                 }
476
477         }
478
479         /* Close the handle. */
480         ret = dbenv->close(dbenv, 0);
481         if (ret) {
482                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
483         }
484 }
485
486
487 /*
488  * Compression functions only used if we have zlib
489  */
490 void cdb_decompress_if_necessary(struct cdbdata *cdb)
491 {
492         static int magic = COMPRESS_MAGIC;
493
494         if ((cdb == NULL) || 
495             (cdb->ptr == NULL) || 
496             (cdb->len < sizeof(magic)) ||
497             (memcmp(cdb->ptr, &magic, sizeof(magic))))
498             return;
499
500 #ifdef HAVE_ZLIB
501         /* At this point we know we're looking at a compressed item. */
502
503         struct CtdlCompressHeader zheader;
504         char *uncompressed_data;
505         char *compressed_data;
506         uLongf destLen, sourceLen;
507         size_t cplen;
508
509         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
510         cplen = sizeof(struct CtdlCompressHeader);
511         if (sizeof(struct CtdlCompressHeader) > cdb->len)
512                 cplen = cdb->len;
513         memcpy(&zheader, cdb->ptr, cplen);
514
515         compressed_data = cdb->ptr;
516         compressed_data += sizeof(struct CtdlCompressHeader);
517
518         sourceLen = (uLongf) zheader.compressed_len;
519         destLen = (uLongf) zheader.uncompressed_len;
520         uncompressed_data = malloc(zheader.uncompressed_len);
521
522         if (uncompress((Bytef *) uncompressed_data,
523                        (uLongf *) & destLen,
524                        (const Bytef *) compressed_data,
525                        (uLong) sourceLen) != Z_OK) {
526                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
527                 abort();
528         }
529
530         free(cdb->ptr);
531         cdb->len = (size_t) destLen;
532         cdb->ptr = uncompressed_data;
533 #else                           /* HAVE_ZLIB */
534         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
535         abort();
536 #endif                          /* HAVE_ZLIB */
537 }
538
539
540
541 /*
542  * Store a piece of data.  Returns 0 if the operation was successful.  If a
543  * key already exists it should be overwritten.
544  */
545 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
546 {
547
548         DBT dkey, ddata;
549         DB_TXN *tid;
550         int ret = 0;
551
552 #ifdef HAVE_ZLIB
553         struct CtdlCompressHeader zheader;
554         char *compressed_data = NULL;
555         int compressing = 0;
556         size_t buffer_len = 0;
557         uLongf destLen = 0;
558 #endif
559
560         memset(&dkey, 0, sizeof(DBT));
561         memset(&ddata, 0, sizeof(DBT));
562         dkey.size = ckeylen;
563         dkey.data = ckey;
564         ddata.size = cdatalen;
565         ddata.data = cdata;
566
567 #ifdef HAVE_ZLIB
568         /* Only compress Visit records.  Everything else is uncompressed. */
569         if (cdb == CDB_VISIT) {
570                 compressing = 1;
571                 zheader.magic = COMPRESS_MAGIC;
572                 zheader.uncompressed_len = cdatalen;
573                 buffer_len = ((cdatalen * 101) / 100) + 100
574                     + sizeof(struct CtdlCompressHeader);
575                 destLen = (uLongf) buffer_len;
576                 compressed_data = malloc(buffer_len);
577                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
578                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
579                 {
580                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
581                         abort();
582                 }
583                 zheader.compressed_len = (size_t) destLen;
584                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
585                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
586                 ddata.data = compressed_data;
587         }
588 #endif
589
590         if (MYTID != NULL) {
591                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
592                                     MYTID,      /* transaction ID */
593                                     &dkey,      /* key */
594                                     &ddata,     /* data */
595                                     0); /* flags */
596                 if (ret) {
597                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
598                         abort();
599                 }
600 #ifdef HAVE_ZLIB
601                 if (compressing)
602                         free(compressed_data);
603 #endif
604                 return ret;
605
606         } else {
607                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
608
609               retry:
610                 txbegin(&tid);
611
612                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
613                                          tid,   /* transaction ID */
614                                          &dkey, /* key */
615                                          &ddata,        /* data */
616                                          0))) { /* flags */
617                         if (ret == DB_LOCK_DEADLOCK) {
618                                 txabort(tid);
619                                 goto retry;
620                         } else {
621                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
622                                         cdb, db_strerror(ret));
623                                 abort();
624                         }
625                 } else {
626                         txcommit(tid);
627 #ifdef HAVE_ZLIB
628                         if (compressing) {
629                                 free(compressed_data);
630                         }
631 #endif
632                         return ret;
633                 }
634         }
635 }
636
637
638 /*
639  * Delete a piece of data.  Returns 0 if the operation was successful.
640  */
641 int cdb_delete(int cdb, void *key, int keylen)
642 {
643
644         DBT dkey;
645         DB_TXN *tid;
646         int ret;
647
648         memset(&dkey, 0, sizeof dkey);
649         dkey.size = keylen;
650         dkey.data = key;
651
652         if (MYTID != NULL) {
653                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
654                 if (ret) {
655                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
656                         if (ret != DB_NOTFOUND) {
657                                 abort();
658                         }
659                 }
660         } else {
661                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
662
663               retry:
664                 txbegin(&tid);
665
666                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
667                     && ret != DB_NOTFOUND) {
668                         if (ret == DB_LOCK_DEADLOCK) {
669                                 txabort(tid);
670                                 goto retry;
671                         } else {
672                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
673                                         cdb, db_strerror(ret));
674                                 abort();
675                         }
676                 } else {
677                         txcommit(tid);
678                 }
679         }
680         return ret;
681 }
682
683 static DBC *localcursor(int cdb)
684 {
685         int ret;
686         DBC *curs;
687
688         if (MYCURSORS[cdb] == NULL)
689                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
690         else
691                 ret =
692                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
693                                           DB_POSITION);
694
695         if (ret) {
696                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
697                 abort();
698         }
699
700         return curs;
701 }
702
703
704 /*
705  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
706  * a struct cdbdata which it is the caller's responsibility to free later on
707  * using the cdb_free() routine.
708  */
709 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen)
710 {
711
712         struct cdbdata *tempcdb;
713         DBT dkey, dret;
714         int ret;
715
716         memset(&dkey, 0, sizeof(DBT));
717         dkey.size = keylen;
718         dkey.data = key;
719
720         if (MYTID != NULL) {
721                 memset(&dret, 0, sizeof(DBT));
722                 dret.flags = DB_DBT_MALLOC;
723                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
724         } else {
725                 DBC *curs;
726
727                 do {
728                         memset(&dret, 0, sizeof(DBT));
729                         dret.flags = DB_DBT_MALLOC;
730
731                         curs = localcursor(cdb);
732
733                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
734                         cclose(curs);
735                 }
736                 while (ret == DB_LOCK_DEADLOCK);
737
738         }
739
740         if ((ret != 0) && (ret != DB_NOTFOUND)) {
741                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
742                 abort();
743         }
744
745         if (ret != 0)
746                 return NULL;
747         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
748
749         if (tempcdb == NULL) {
750                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
751                 abort();
752         }
753
754         tempcdb->len = dret.size;
755         tempcdb->ptr = dret.data;
756         cdb_decompress_if_necessary(tempcdb);
757         return (tempcdb);
758 }
759
760
761 /*
762  * Free a cdbdata item.
763  *
764  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
765  * other code to assume ownership of that memory simply by storing the
766  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
767  * avoid freeing it.
768  */
769 void cdb_free(struct cdbdata *cdb)
770 {
771         if (cdb->ptr) {
772                 free(cdb->ptr);
773         }
774         free(cdb);
775 }
776
777 void cdb_close_cursor(int cdb)
778 {
779         if (MYCURSORS[cdb] != NULL) {
780                 cclose(MYCURSORS[cdb]);
781         }
782
783         MYCURSORS[cdb] = NULL;
784 }
785
786 /* 
787  * Prepare for a sequential search of an entire database.
788  * (There is guaranteed to be no more than one traversal in
789  * progress per thread at any given time.)
790  */
791 void cdb_rewind(int cdb)
792 {
793         int ret = 0;
794
795         if (MYCURSORS[cdb] != NULL) {
796                 CtdlLogPrintf(CTDL_EMERG,
797                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
798                 abort();
799                 /* cclose(MYCURSORS[cdb]); */
800         }
801
802         /*
803          * Now initialize the cursor
804          */
805         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
806         if (ret) {
807                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
808                 abort();
809         }
810 }
811
812
813 /*
814  * Fetch the next item in a sequential search.  Returns a pointer to a 
815  * cdbdata structure, or NULL if we've hit the end.
816  */
817 struct cdbdata *cdb_next_item(int cdb)
818 {
819         DBT key, data;
820         struct cdbdata *cdbret;
821         int ret = 0;
822
823         /* Initialize the key/data pair so the flags aren't set. */
824         memset(&key, 0, sizeof(key));
825         memset(&data, 0, sizeof(data));
826         data.flags = DB_DBT_MALLOC;
827
828         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
829
830         if (ret) {
831                 if (ret != DB_NOTFOUND) {
832                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
833                         abort();
834                 }
835                 cclose(MYCURSORS[cdb]);
836                 MYCURSORS[cdb] = NULL;
837                 return NULL;    /* presumably, end of file */
838         }
839
840         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
841         cdbret->len = data.size;
842         cdbret->ptr = data.data;
843         cdb_decompress_if_necessary(cdbret);
844
845         return (cdbret);
846 }
847
848
849
850 /*
851  * Transaction-based stuff.  I'm writing this as I bake cookies...
852  */
853
854 void cdb_begin_transaction(void)
855 {
856
857         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
858
859         if (MYTID != NULL) {
860                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
861                 abort();
862         }
863
864         txbegin(&MYTID);
865 }
866
867 void cdb_end_transaction(void)
868 {
869         int i;
870
871         for (i = 0; i < MAXCDB; i++)
872                 if (MYCURSORS[i] != NULL) {
873                         CtdlLogPrintf(CTDL_WARNING,
874                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
875                                 i);
876                         cclose(MYCURSORS[i]);
877                         MYCURSORS[i] = NULL;
878                 }
879
880         if (MYTID == NULL) {
881                 CtdlLogPrintf(CTDL_EMERG,
882                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
883                 abort();
884         } else {
885                 txcommit(MYTID);
886         }
887
888         MYTID = NULL;
889 }
890
891 /*
892  * Truncate (delete every record)
893  */
894 void cdb_trunc(int cdb)
895 {
896         /* DB_TXN *tid; */
897         int ret;
898         u_int32_t count;
899
900         if (MYTID != NULL) {
901                 CtdlLogPrintf(CTDL_EMERG,
902                         "cdb_trunc must not be called in a transaction.\n");
903                 abort();
904         } else {
905                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
906
907               retry:
908                 /* txbegin(&tid); */
909
910                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
911                                               NULL,     /* transaction ID */
912                                               &count,   /* #rows deleted */
913                                               0))) {    /* flags */
914                         if (ret == DB_LOCK_DEADLOCK) {
915                                 /* txabort(tid); */
916                                 goto retry;
917                         } else {
918                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
919                                 if (ret == ENOMEM) {
920                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
921                                 }
922                                 abort();
923                         }
924                 } else {
925                         /* txcommit(tid); */
926                 }
927         }
928 }