Add cit_backtrace() to cdb_abort()
[citadel.git] / citadel / database.c
1 /*
2  * This is a data store backend for the Citadel server which uses Berkeley DB.
3  *
4  * Copyright (c) 1987-2012 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License version 3.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  */
15
16 /*****************************************************************************
17        Tunable configuration parameters for the Berkeley DB back end
18  *****************************************************************************/
19
20 /* Citadel will checkpoint the db at the end of every session, but only if
21  * the specified number of kilobytes has been written, or if the specified
22  * number of minutes has passed, since the last checkpoint.
23  */
24 #define MAX_CHECKPOINT_KBYTES   256
25 #define MAX_CHECKPOINT_MINUTES  15
26
27 /*****************************************************************************/
28
29 #include "sysdep.h"
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <stdio.h>
33 #include <ctype.h>
34 #include <string.h>
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <dirent.h>
39 #include <syslog.h>
40 #include <zlib.h>
41
42 #ifdef HAVE_DB_H
43 #include <db.h>
44 #elif defined(HAVE_DB4_DB_H)
45 #include <db4/db.h>
46 #else
47 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
48 #endif
49
50
51 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
52 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
53 #endif
54
55
56 #include <libcitadel.h>
57 #include "citadel.h"
58 #include "server.h"
59 #include "citserver.h"
60 #include "database.h"
61 #include "msgbase.h"
62 #include "sysdep_decls.h"
63 #include "threads.h"
64 #include "config.h"
65 #include "control.h"
66
67 #include "ctdl_module.h"
68
69
70 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
71 static DB_ENV *dbenv;           /* The DB environment (global) */
72
73
74 void cdb_abort(void) {
75         syslog(LOG_DEBUG,
76                 "citserver is stopping in order to prevent data loss. uid=%d gid=%d euid=%d egid=%d",
77                 getuid(),
78                 getgid(),
79                 geteuid(),
80                 getegid()
81         );
82         cit_backtrace();
83         exit(CTDLEXIT_DB);
84 }
85
86
87 /* Verbose logging callback */
88 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
89 {
90         if (!IsEmptyStr(msg)) {
91                 syslog(LOG_DEBUG, "DB: %s", msg);
92         }
93 }
94
95
96 /* Verbose logging callback */
97 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
98 {
99         syslog(LOG_ALERT, "DB: %s", msg);
100 }
101
102
103 /* just a little helper function */
104 static void txabort(DB_TXN * tid)
105 {
106         int ret;
107
108         ret = tid->abort(tid);
109
110         if (ret) {
111                 syslog(LOG_EMERG, "bdb(): txn_abort: %s", db_strerror(ret));
112                 cdb_abort();
113         }
114 }
115
116 /* this one is even more helpful than the last. */
117 static void txcommit(DB_TXN * tid)
118 {
119         int ret;
120
121         ret = tid->commit(tid, 0);
122
123         if (ret) {
124                 syslog(LOG_EMERG, "bdb(): txn_commit: %s", db_strerror(ret));
125                 cdb_abort();
126         }
127 }
128
129 /* are you sensing a pattern yet? */
130 static void txbegin(DB_TXN ** tid)
131 {
132         int ret;
133
134         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
135
136         if (ret) {
137                 syslog(LOG_EMERG, "bdb(): txn_begin: %s", db_strerror(ret));
138                 cdb_abort();
139         }
140 }
141
142 static void dbpanic(DB_ENV * env, int errval)
143 {
144         syslog(LOG_EMERG, "bdb(): PANIC: %s", db_strerror(errval));
145 }
146
147 static void cclose(DBC * cursor)
148 {
149         int ret;
150
151         if ((ret = cursor->c_close(cursor))) {
152                 syslog(LOG_EMERG, "bdb(): c_close: %s", db_strerror(ret));
153                 cdb_abort();
154         }
155 }
156
157 static void bailIfCursor(DBC ** cursors, const char *msg)
158 {
159         int i;
160
161         for (i = 0; i < MAXCDB; i++)
162                 if (cursors[i] != NULL) {
163                         syslog(LOG_EMERG, "bdb(): cursor still in progress on cdb %02x: %s", i, msg);
164                         cdb_abort();
165                 }
166 }
167
168
169 void cdb_check_handles(void)
170 {
171         bailIfCursor(TSD->cursors, "in check_handles");
172
173         if (TSD->tid != NULL) {
174                 syslog(LOG_EMERG, "bdb(): transaction still in progress!");
175                 cdb_abort();
176         }
177 }
178
179
180 /*
181  * Cull the database logs
182  */
183 static void cdb_cull_logs(void)
184 {
185         u_int32_t flags;
186         int ret;
187         char **file, **list;
188         char errmsg[SIZ];
189
190         flags = DB_ARCH_ABS;
191
192         /* Get the list of names. */
193         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
194                 syslog(LOG_ERR, "cdb_cull_logs: %s", db_strerror(ret));
195                 return;
196         }
197
198         /* Print the list of names. */
199         if (list != NULL) {
200                 for (file = list; *file != NULL; ++file) {
201                         syslog(LOG_DEBUG, "Deleting log: %s", *file);
202                         ret = unlink(*file);
203                         if (ret != 0) {
204                                 snprintf(errmsg, sizeof(errmsg),
205                                          " ** ERROR **\n \n \n "
206                                          "Citadel was unable to delete the "
207                                          "database log file '%s' because of the "
208                                          "following error:\n \n %s\n \n"
209                                          " This log file is no longer in use "
210                                          "and may be safely deleted.\n",
211                                          *file, strerror(errno));
212                                 CtdlAideMessage(errmsg, "Database Warning Message");
213                         }
214                 }
215                 free(list);
216         }
217 }
218
219 /*
220  * Manually initiate log file cull.
221  */
222 void cmd_cull(char *argbuf) {
223         if (CtdlAccessCheck(ac_internal)) return;
224         cdb_cull_logs();
225         cprintf("%d Database log file cull completed.", CIT_OK);
226 }
227
228
229 /*
230  * Request a checkpoint of the database.  Called once per minute by the thread manager.
231  */
232 void cdb_checkpoint(void)
233 {
234         int ret;
235
236         syslog(LOG_DEBUG, "-- db checkpoint --");
237         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
238
239         if (ret != 0) {
240                 syslog(LOG_EMERG, "cdb_checkpoint: txn_checkpoint: %s", db_strerror(ret));
241                 cdb_abort();
242         }
243
244         /* After a successful checkpoint, we can cull the unused logs */
245         if (config.c_auto_cull) {
246                 cdb_cull_logs();
247         }
248 }
249
250
251
252 /*
253  * Open the various databases we'll be using.  Any database which
254  * does not exist should be created.  Note that we don't need a
255  * critical section here, because there aren't any active threads
256  * manipulating the database yet.
257  */
258 void open_databases(void)
259 {
260         int ret;
261         int i;
262         char dbfilename[32];
263         u_int32_t flags = 0;
264         int dbversion_major, dbversion_minor, dbversion_patch;
265         int current_dbversion = 0;
266
267         syslog(LOG_DEBUG, "bdb(): open_databases() starting");
268         syslog(LOG_DEBUG, "Compiled db: %s", DB_VERSION_STRING);
269         syslog(LOG_INFO, "  Linked db: %s",
270                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
271
272         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
273
274         syslog(LOG_DEBUG, "Calculated dbversion: %d", current_dbversion);
275         syslog(LOG_DEBUG, "  Previous dbversion: %d", CitControl.MMdbversion);
276
277         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
278            && (CitControl.MMdbversion > current_dbversion) ) {
279                 syslog(LOG_EMERG, "You are attempting to run the Citadel server using a version");
280                 syslog(LOG_EMERG, "of Berkeley DB that is older than that which last created or");
281                 syslog(LOG_EMERG, "updated the database.  Because this would probably cause data");
282                 syslog(LOG_EMERG, "corruption or loss, the server is aborting execution now.");
283                 exit(CTDLEXIT_DB);
284         }
285
286         CitControl.MMdbversion = current_dbversion;
287         put_control();
288
289         syslog(LOG_INFO, "Linked zlib: %s\n", zlibVersion());
290
291         /*
292          * Silently try to create the database subdirectory.  If it's
293          * already there, no problem.
294          */
295         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
296                 syslog(LOG_EMERG, 
297                               "unable to create database directory [%s]: %s", 
298                               ctdl_data_dir, strerror(errno));
299         }
300         if (chmod(ctdl_data_dir, 0700) != 0){
301                 syslog(LOG_EMERG, 
302                               "unable to set database directory accessrights [%s]: %s", 
303                               ctdl_data_dir, strerror(errno));
304         }
305         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
306                 syslog(LOG_EMERG, 
307                               "unable to set the owner for [%s]: %s", 
308                               ctdl_data_dir, strerror(errno));
309         }
310         syslog(LOG_DEBUG, "bdb(): Setting up DB environment\n");
311         /* db_env_set_func_yield((int (*)(u_long,  u_long))sched_yield); */
312         ret = db_env_create(&dbenv, 0);
313         if (ret) {
314                 syslog(LOG_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
315                 syslog(LOG_EMERG, "exit code %d\n", ret);
316                 exit(CTDLEXIT_DB);
317         }
318         dbenv->set_errpfx(dbenv, "citserver");
319         dbenv->set_paniccall(dbenv, dbpanic);
320         dbenv->set_errcall(dbenv, cdb_verbose_err);
321         dbenv->set_errpfx(dbenv, "ctdl");
322 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
323         dbenv->set_msgcall(dbenv, cdb_verbose_log);
324 #endif
325         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
326         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
327
328         /*
329          * We want to specify the shared memory buffer pool cachesize,
330          * but everything else is the default.
331          */
332         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
333         if (ret) {
334                 syslog(LOG_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
335                 dbenv->close(dbenv, 0);
336                 syslog(LOG_EMERG, "exit code %d\n", ret);
337                 exit(CTDLEXIT_DB);
338         }
339
340         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
341                 syslog(LOG_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
342                 dbenv->close(dbenv, 0);
343                 syslog(LOG_EMERG, "exit code %d\n", ret);
344                 exit(CTDLEXIT_DB);
345         }
346
347         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
348         syslog(LOG_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
349         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
350         if (ret == DB_RUNRECOVERY) {
351                 syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
352                 syslog(LOG_ALERT, "Attempting recovery...\n");
353                 flags |= DB_RECOVER;
354                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
355         }
356         if (ret == DB_RUNRECOVERY) {
357                 syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
358                 syslog(LOG_ALERT, "Attempting catastrophic recovery...\n");
359                 flags &= ~DB_RECOVER;
360                 flags |= DB_RECOVER_FATAL;
361                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
362         }
363         if (ret) {
364                 syslog(LOG_EMERG, "dbenv->open: %s\n", db_strerror(ret));
365                 dbenv->close(dbenv, 0);
366                 syslog(LOG_EMERG, "exit code %d\n", ret);
367                 exit(CTDLEXIT_DB);
368         }
369
370         syslog(LOG_INFO, "Starting up DB\n");
371
372         for (i = 0; i < MAXCDB; ++i) {
373
374                 /* Create a database handle */
375                 ret = db_create(&dbp[i], dbenv, 0);
376                 if (ret) {
377                         syslog(LOG_EMERG, "db_create: %s\n", db_strerror(ret));
378                         syslog(LOG_EMERG, "exit code %d\n", ret);
379                         exit(CTDLEXIT_DB);
380                 }
381
382
383                 /* Arbitrary names for our tables -- we reference them by
384                  * number, so we don't have string names for them.
385                  */
386                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
387
388                 ret = dbp[i]->open(dbp[i],
389                                    NULL,
390                                    dbfilename,
391                                    NULL,
392                                    DB_BTREE,
393                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
394                                    0600
395                 );
396                 if (ret) {
397                         syslog(LOG_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
398                         if (ret == ENOMEM) {
399                                 syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
400                         }
401                         syslog(LOG_EMERG, "exit code %d\n", ret);
402                         exit(CTDLEXIT_DB);
403                 }
404         }
405
406 }
407
408
409 /* Make sure we own all the files, because in a few milliseconds
410  * we're going to drop root privs.
411  */
412 void cdb_chmod_data(void) {
413         DIR *dp;
414         struct dirent *d;
415         char filename[PATH_MAX];
416
417         dp = opendir(ctdl_data_dir);
418         if (dp != NULL) {
419                 while (d = readdir(dp), d != NULL) {
420                         if (d->d_name[0] != '.') {
421                                 snprintf(filename, sizeof filename,
422                                          "%s/%s", ctdl_data_dir, d->d_name);
423                                 syslog(LOG_DEBUG, "chmod(%s, 0600) returned %d\n",
424                                         filename, chmod(filename, 0600)
425                                 );
426                                 syslog(LOG_DEBUG, "chown(%s, CTDLUID, -1) returned %d\n",
427                                         filename, chown(filename, CTDLUID, (-1))
428                                 );
429                         }
430                 }
431                 closedir(dp);
432         }
433
434         syslog(LOG_DEBUG, "open_databases() finished\n");
435         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
436 }
437
438
439 /*
440  * Close all of the db database files we've opened.  This can be done
441  * in a loop, since it's just a bunch of closes.
442  */
443 void close_databases(void)
444 {
445         int a;
446         int ret;
447
448         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
449                 syslog(LOG_EMERG,
450                         "txn_checkpoint: %s\n", db_strerror(ret));
451         }
452
453         /* print some statistics... */
454 #ifdef DB_STAT_ALL
455         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
456 #endif
457
458         /* close the tables */
459         for (a = 0; a < MAXCDB; ++a) {
460                 syslog(LOG_INFO, "Closing database %02x\n", a);
461                 ret = dbp[a]->close(dbp[a], 0);
462                 if (ret) {
463                         syslog(LOG_EMERG, "db_close: %s\n", db_strerror(ret));
464                 }
465
466         }
467
468         /* Close the handle. */
469         ret = dbenv->close(dbenv, 0);
470         if (ret) {
471                 syslog(LOG_EMERG, "DBENV->close: %s\n", db_strerror(ret));
472         }
473 }
474
475
476 /*
477  * Decompress a database item if it was compressed on disk
478  */
479 void cdb_decompress_if_necessary(struct cdbdata *cdb)
480 {
481         static int magic = COMPRESS_MAGIC;
482
483         if ((cdb == NULL) || 
484             (cdb->ptr == NULL) || 
485             (cdb->len < sizeof(magic)) ||
486             (memcmp(cdb->ptr, &magic, sizeof(magic))))
487             return;
488
489         /* At this point we know we're looking at a compressed item. */
490
491         struct CtdlCompressHeader zheader;
492         char *uncompressed_data;
493         char *compressed_data;
494         uLongf destLen, sourceLen;
495         size_t cplen;
496
497         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
498         cplen = sizeof(struct CtdlCompressHeader);
499         if (sizeof(struct CtdlCompressHeader) > cdb->len)
500                 cplen = cdb->len;
501         memcpy(&zheader, cdb->ptr, cplen);
502
503         compressed_data = cdb->ptr;
504         compressed_data += sizeof(struct CtdlCompressHeader);
505
506         sourceLen = (uLongf) zheader.compressed_len;
507         destLen = (uLongf) zheader.uncompressed_len;
508         uncompressed_data = malloc(zheader.uncompressed_len);
509
510         if (uncompress((Bytef *) uncompressed_data,
511                        (uLongf *) & destLen,
512                        (const Bytef *) compressed_data,
513                        (uLong) sourceLen) != Z_OK) {
514                 syslog(LOG_EMERG, "uncompress() error\n");
515                 cdb_abort();
516         }
517
518         free(cdb->ptr);
519         cdb->len = (size_t) destLen;
520         cdb->ptr = uncompressed_data;
521 }
522
523
524
525 /*
526  * Store a piece of data.  Returns 0 if the operation was successful.  If a
527  * key already exists it should be overwritten.
528  */
529 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
530 {
531
532         DBT dkey, ddata;
533         DB_TXN *tid;
534         int ret = 0;
535
536         struct CtdlCompressHeader zheader;
537         char *compressed_data = NULL;
538         int compressing = 0;
539         size_t buffer_len = 0;
540         uLongf destLen = 0;
541
542         memset(&dkey, 0, sizeof(DBT));
543         memset(&ddata, 0, sizeof(DBT));
544         dkey.size = ckeylen;
545         dkey.data = ckey;
546         ddata.size = cdatalen;
547         ddata.data = cdata;
548
549         /* Only compress Visit records.  Everything else is uncompressed. */
550         if (cdb == CDB_VISIT) {
551                 compressing = 1;
552                 zheader.magic = COMPRESS_MAGIC;
553                 zheader.uncompressed_len = cdatalen;
554                 buffer_len = ((cdatalen * 101) / 100) + 100
555                     + sizeof(struct CtdlCompressHeader);
556                 destLen = (uLongf) buffer_len;
557                 compressed_data = malloc(buffer_len);
558                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
559                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
560                 {
561                         syslog(LOG_EMERG, "compress2() error\n");
562                         cdb_abort();
563                 }
564                 zheader.compressed_len = (size_t) destLen;
565                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
566                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
567                 ddata.data = compressed_data;
568         }
569
570         if (TSD->tid != NULL) {
571                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
572                                     TSD->tid,   /* transaction ID */
573                                     &dkey,      /* key */
574                                     &ddata,     /* data */
575                                     0); /* flags */
576                 if (ret) {
577                         syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret));
578                         cdb_abort();
579                 }
580                 if (compressing)
581                         free(compressed_data);
582                 return ret;
583
584         } else {
585                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
586
587               retry:
588                 txbegin(&tid);
589
590                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
591                                          tid,   /* transaction ID */
592                                          &dkey, /* key */
593                                          &ddata,        /* data */
594                                          0))) { /* flags */
595                         if (ret == DB_LOCK_DEADLOCK) {
596                                 txabort(tid);
597                                 goto retry;
598                         } else {
599                                 syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret));
600                                 cdb_abort();
601                         }
602                 } else {
603                         txcommit(tid);
604                         if (compressing) {
605                                 free(compressed_data);
606                         }
607                         return ret;
608                 }
609         }
610         return ret;
611 }
612
613
614 /*
615  * Delete a piece of data.  Returns 0 if the operation was successful.
616  */
617 int cdb_delete(int cdb, void *key, int keylen)
618 {
619
620         DBT dkey;
621         DB_TXN *tid;
622         int ret;
623
624         memset(&dkey, 0, sizeof dkey);
625         dkey.size = keylen;
626         dkey.data = key;
627
628         if (TSD->tid != NULL) {
629                 ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0);
630                 if (ret) {
631                         syslog(LOG_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
632                         if (ret != DB_NOTFOUND) {
633                                 cdb_abort();
634                         }
635                 }
636         } else {
637                 bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor");
638
639               retry:
640                 txbegin(&tid);
641
642                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
643                     && ret != DB_NOTFOUND) {
644                         if (ret == DB_LOCK_DEADLOCK) {
645                                 txabort(tid);
646                                 goto retry;
647                         } else {
648                                 syslog(LOG_EMERG, "cdb_delete(%d): %s\n",
649                                         cdb, db_strerror(ret));
650                                 cdb_abort();
651                         }
652                 } else {
653                         txcommit(tid);
654                 }
655         }
656         return ret;
657 }
658
659 static DBC *localcursor(int cdb)
660 {
661         int ret;
662         DBC *curs;
663
664         if (TSD->cursors[cdb] == NULL)
665                 ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0);
666         else
667                 ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
668
669         if (ret) {
670                 syslog(LOG_EMERG, "localcursor: %s\n", db_strerror(ret));
671                 cdb_abort();
672         }
673
674         return curs;
675 }
676
677
678 /*
679  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
680  * a struct cdbdata which it is the caller's responsibility to free later on
681  * using the cdb_free() routine.
682  */
683 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen)
684 {
685
686         struct cdbdata *tempcdb;
687         DBT dkey, dret;
688         int ret;
689
690         memset(&dkey, 0, sizeof(DBT));
691         dkey.size = keylen;
692         dkey.data = key;
693
694         if (TSD->tid != NULL) {
695                 memset(&dret, 0, sizeof(DBT));
696                 dret.flags = DB_DBT_MALLOC;
697                 ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0);
698         } else {
699                 DBC *curs;
700
701                 do {
702                         memset(&dret, 0, sizeof(DBT));
703                         dret.flags = DB_DBT_MALLOC;
704
705                         curs = localcursor(cdb);
706
707                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
708                         cclose(curs);
709                 }
710                 while (ret == DB_LOCK_DEADLOCK);
711
712         }
713
714         if ((ret != 0) && (ret != DB_NOTFOUND)) {
715                 syslog(LOG_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
716                 cdb_abort();
717         }
718
719         if (ret != 0)
720                 return NULL;
721         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
722
723         if (tempcdb == NULL) {
724                 syslog(LOG_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
725                 cdb_abort();
726         }
727
728         tempcdb->len = dret.size;
729         tempcdb->ptr = dret.data;
730         cdb_decompress_if_necessary(tempcdb);
731         return (tempcdb);
732 }
733
734
735 /*
736  * Free a cdbdata item.
737  *
738  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
739  * other code to assume ownership of that memory simply by storing the
740  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
741  * avoid freeing it.
742  */
743 void cdb_free(struct cdbdata *cdb)
744 {
745         if (cdb->ptr) {
746                 free(cdb->ptr);
747         }
748         free(cdb);
749 }
750
751 void cdb_close_cursor(int cdb)
752 {
753         if (TSD->cursors[cdb] != NULL) {
754                 cclose(TSD->cursors[cdb]);
755         }
756
757         TSD->cursors[cdb] = NULL;
758 }
759
760 /* 
761  * Prepare for a sequential search of an entire database.
762  * (There is guaranteed to be no more than one traversal in
763  * progress per thread at any given time.)
764  */
765 void cdb_rewind(int cdb)
766 {
767         int ret = 0;
768
769         if (TSD->cursors[cdb] != NULL) {
770                 syslog(LOG_EMERG,
771                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
772                 cdb_abort();
773                 /* cclose(TSD->cursors[cdb]); */
774         }
775
776         /*
777          * Now initialize the cursor
778          */
779         ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0);
780         if (ret) {
781                 syslog(LOG_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
782                 cdb_abort();
783         }
784 }
785
786
787 /*
788  * Fetch the next item in a sequential search.  Returns a pointer to a 
789  * cdbdata structure, or NULL if we've hit the end.
790  */
791 struct cdbdata *cdb_next_item(int cdb)
792 {
793         DBT key, data;
794         struct cdbdata *cdbret;
795         int ret = 0;
796
797         /* Initialize the key/data pair so the flags aren't set. */
798         memset(&key, 0, sizeof(key));
799         memset(&data, 0, sizeof(data));
800         data.flags = DB_DBT_MALLOC;
801
802         ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT);
803
804         if (ret) {
805                 if (ret != DB_NOTFOUND) {
806                         syslog(LOG_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
807                         cdb_abort();
808                 }
809                 cdb_close_cursor(cdb);
810                 return NULL;    /* presumably, end of file */
811         }
812
813         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
814         cdbret->len = data.size;
815         cdbret->ptr = data.data;
816         cdb_decompress_if_necessary(cdbret);
817
818         return (cdbret);
819 }
820
821
822
823 /*
824  * Transaction-based stuff.  I'm writing this as I bake cookies...
825  */
826
827 void cdb_begin_transaction(void)
828 {
829
830         bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
831
832         if (TSD->tid != NULL) {
833                 syslog(LOG_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
834                 cdb_abort();
835         }
836
837         txbegin(&TSD->tid);
838 }
839
840 void cdb_end_transaction(void)
841 {
842         int i;
843
844         for (i = 0; i < MAXCDB; i++)
845                 if (TSD->cursors[i] != NULL) {
846                         syslog(LOG_WARNING,
847                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
848                                 i);
849                         cclose(TSD->cursors[i]);
850                         TSD->cursors[i] = NULL;
851                 }
852
853         if (TSD->tid == NULL) {
854                 syslog(LOG_EMERG,
855                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
856                 cdb_abort();
857         } else {
858                 txcommit(TSD->tid);
859         }
860
861         TSD->tid = NULL;
862 }
863
864 /*
865  * Truncate (delete every record)
866  */
867 void cdb_trunc(int cdb)
868 {
869         /* DB_TXN *tid; */
870         int ret;
871         u_int32_t count;
872
873         if (TSD->tid != NULL) {
874                 syslog(LOG_EMERG, "cdb_trunc must not be called in a transaction.");
875                 cdb_abort();
876         } else {
877                 bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
878
879               retry:
880                 /* txbegin(&tid); */
881
882                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
883                                               NULL,     /* transaction ID */
884                                               &count,   /* #rows deleted */
885                                               0))) {    /* flags */
886                         if (ret == DB_LOCK_DEADLOCK) {
887                                 /* txabort(tid); */
888                                 goto retry;
889                         } else {
890                                 syslog(LOG_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
891                                 if (ret == ENOMEM) {
892                                         syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.");
893                                 }
894                                 exit(CTDLEXIT_DB);
895                         }
896                 } else {
897                         /* txcommit(tid); */
898                 }
899         }
900 }