d0aab07300d56305ba0c4203c74445b03c1317ad
[citadel.git] / citadel / database.c
1 /*
2  * This is a data store backend for the Citadel server which uses Berkeley DB.
3  *
4  * Copyright (c) 1987-2011 by the citadel.org team
5  *
6  * This program is open source software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as published
8  * by the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  */
21
22 /*****************************************************************************
23        Tunable configuration parameters for the Berkeley DB back end
24  *****************************************************************************/
25
26 /* Citadel will checkpoint the db at the end of every session, but only if
27  * the specified number of kilobytes has been written, or if the specified
28  * number of minutes has passed, since the last checkpoint.
29  */
30 #define MAX_CHECKPOINT_KBYTES   256
31 #define MAX_CHECKPOINT_MINUTES  15
32
33 /*****************************************************************************/
34
35 #include "sysdep.h"
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <ctype.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <dirent.h>
45 #include <syslog.h>
46
47 #ifdef HAVE_DB_H
48 #include <db.h>
49 #elif defined(HAVE_DB4_DB_H)
50 #include <db4/db.h>
51 #else
52 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
53 #endif
54
55
56 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
57 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
58 #endif
59
60
61 #include <libcitadel.h>
62 #include "citadel.h"
63 #include "server.h"
64 #include "citserver.h"
65 #include "database.h"
66 #include "msgbase.h"
67 #include "sysdep_decls.h"
68 #include "threads.h"
69 #include "config.h"
70 #include "control.h"
71
72 #include "ctdl_module.h"
73
74
75 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
76 static DB_ENV *dbenv;           /* The DB environment (global) */
77
78
79 #ifdef HAVE_ZLIB
80 #include <zlib.h>
81 #endif
82
83
84 /* Verbose logging callback */
85 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
86 {
87         if (!IsEmptyStr(msg)) {
88                 syslog(LOG_DEBUG, "DB: %s\n", msg);
89         }
90 }
91
92
93 /* Verbose logging callback */
94 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
95 {
96         syslog(LOG_ALERT, "DB: %s\n", msg);
97 }
98
99
100 /* just a little helper function */
101 static void txabort(DB_TXN * tid)
102 {
103         int ret;
104
105         ret = tid->abort(tid);
106
107         if (ret) {
108                 syslog(LOG_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
109                 abort();
110         }
111 }
112
113 /* this one is even more helpful than the last. */
114 static void txcommit(DB_TXN * tid)
115 {
116         int ret;
117
118         ret = tid->commit(tid, 0);
119
120         if (ret) {
121                 syslog(LOG_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
122                 abort();
123         }
124 }
125
126 /* are you sensing a pattern yet? */
127 static void txbegin(DB_TXN ** tid)
128 {
129         int ret;
130
131         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
132
133         if (ret) {
134                 syslog(LOG_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
135                 abort();
136         }
137 }
138
139 static void dbpanic(DB_ENV * env, int errval)
140 {
141         syslog(LOG_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
142 }
143
144 static void cclose(DBC * cursor)
145 {
146         int ret;
147
148         if ((ret = cursor->c_close(cursor))) {
149                 syslog(LOG_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
150                 abort();
151         }
152 }
153
154 static void bailIfCursor(DBC ** cursors, const char *msg)
155 {
156         int i;
157
158         for (i = 0; i < MAXCDB; i++)
159                 if (cursors[i] != NULL) {
160                         syslog(LOG_EMERG,
161                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
162                         abort();
163                 }
164 }
165
166 void check_handles(void *arg)
167 {
168         if (arg != NULL) {
169                 ThreadTSD *tsd = (ThreadTSD *) arg;
170
171                 bailIfCursor(tsd->cursors, "in check_handles");
172
173                 if (tsd->tid != NULL) {
174                         syslog(LOG_EMERG, "bdb(): transaction still in progress!");
175                         abort();
176                 }
177         }
178 }
179
180 void cdb_check_handles(void)
181 {
182         check_handles(pthread_getspecific(ThreadKey));
183 }
184
185
186 /*
187  * Cull the database logs
188  */
189 static void cdb_cull_logs(void)
190 {
191         u_int32_t flags;
192         int ret;
193         char **file, **list;
194         char errmsg[SIZ];
195
196         flags = DB_ARCH_ABS;
197
198         /* Get the list of names. */
199         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
200                 syslog(LOG_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
201                 return;
202         }
203
204         /* Print the list of names. */
205         if (list != NULL) {
206                 for (file = list; *file != NULL; ++file) {
207                         syslog(LOG_DEBUG, "Deleting log: %s\n", *file);
208                         ret = unlink(*file);
209                         if (ret != 0) {
210                                 snprintf(errmsg, sizeof(errmsg),
211                                          " ** ERROR **\n \n \n "
212                                          "Citadel was unable to delete the "
213                                          "database log file '%s' because of the "
214                                          "following error:\n \n %s\n \n"
215                                          " This log file is no longer in use "
216                                          "and may be safely deleted.\n",
217                                          *file, strerror(errno));
218                                 CtdlAideMessage(errmsg, "Database Warning Message");
219                         }
220                 }
221                 free(list);
222         }
223 }
224
225 /*
226  * Manually initiate log file cull.
227  */
228 void cmd_cull(char *argbuf) {
229         if (CtdlAccessCheck(ac_internal)) return;
230         cdb_cull_logs();
231         cprintf("%d Database log file cull completed.\n", CIT_OK);
232 }
233
234
235 /*
236  * Request a checkpoint of the database.  Called once per minute by the thread manager.
237  */
238 void cdb_checkpoint(void)
239 {
240         int ret;
241
242         syslog(LOG_DEBUG, "-- db checkpoint --\n");
243         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
244
245         if (ret != 0) {
246                 syslog(LOG_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
247                 abort();
248         }
249
250         /* After a successful checkpoint, we can cull the unused logs */
251         if (config.c_auto_cull) {
252                 cdb_cull_logs();
253         }
254 }
255
256
257
258 /*
259  * Open the various databases we'll be using.  Any database which
260  * does not exist should be created.  Note that we don't need a
261  * critical section here, because there aren't any active threads
262  * manipulating the database yet.
263  */
264 void open_databases(void)
265 {
266         int ret;
267         int i;
268         char dbfilename[32];
269         u_int32_t flags = 0;
270         int dbversion_major, dbversion_minor, dbversion_patch;
271         int current_dbversion = 0;
272
273         syslog(LOG_DEBUG, "bdb(): open_databases() starting\n");
274         syslog(LOG_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
275         syslog(LOG_INFO, "  Linked db: %s\n",
276                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
277
278         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
279
280         syslog(LOG_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
281         syslog(LOG_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
282
283         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
284            && (CitControl.MMdbversion > current_dbversion) ) {
285                 syslog(LOG_EMERG, "You are attempting to run the Citadel server using a version\n"
286                                         "of Berkeley DB that is older than that which last created or\n"
287                                         "updated the database.  Because this would probably cause data\n"
288                                         "corruption or loss, the server is aborting execution now.\n");
289                 exit(CTDLEXIT_DB);
290         }
291
292         CitControl.MMdbversion = current_dbversion;
293         put_control();
294
295 #ifdef HAVE_ZLIB
296         syslog(LOG_INFO, "Linked zlib: %s\n", zlibVersion());
297 #endif
298
299         /*
300          * Silently try to create the database subdirectory.  If it's
301          * already there, no problem.
302          */
303         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
304                 syslog(LOG_EMERG, 
305                               "unable to create database directory [%s]: %s", 
306                               ctdl_data_dir, strerror(errno));
307         }
308         if (chmod(ctdl_data_dir, 0700) != 0){
309                 syslog(LOG_EMERG, 
310                               "unable to set database directory accessrights [%s]: %s", 
311                               ctdl_data_dir, strerror(errno));
312         }
313         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
314                 syslog(LOG_EMERG, 
315                               "unable to set the owner for [%s]: %s", 
316                               ctdl_data_dir, strerror(errno));
317         }
318         syslog(LOG_DEBUG, "bdb(): Setting up DB environment\n");
319         db_env_set_func_yield((int (*)(u_long,  u_long))sched_yield);
320         ret = db_env_create(&dbenv, 0);
321         if (ret) {
322                 syslog(LOG_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
323                 syslog(LOG_EMERG, "exit code %d\n", ret);
324                 exit(CTDLEXIT_DB);
325         }
326         dbenv->set_errpfx(dbenv, "citserver");
327         dbenv->set_paniccall(dbenv, dbpanic);
328         dbenv->set_errcall(dbenv, cdb_verbose_err);
329         dbenv->set_errpfx(dbenv, "ctdl");
330 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
331         dbenv->set_msgcall(dbenv, cdb_verbose_log);
332 #endif
333         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
334         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
335
336         /*
337          * We want to specify the shared memory buffer pool cachesize,
338          * but everything else is the default.
339          */
340         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
341         if (ret) {
342                 syslog(LOG_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
343                 dbenv->close(dbenv, 0);
344                 syslog(LOG_EMERG, "exit code %d\n", ret);
345                 exit(CTDLEXIT_DB);
346         }
347
348         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
349                 syslog(LOG_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
350                 dbenv->close(dbenv, 0);
351                 syslog(LOG_EMERG, "exit code %d\n", ret);
352                 exit(CTDLEXIT_DB);
353         }
354
355         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
356         syslog(LOG_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
357         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
358         if (ret == DB_RUNRECOVERY) {
359                 syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
360                 syslog(LOG_ALERT, "Attempting recovery...\n");
361                 flags |= DB_RECOVER;
362                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
363         }
364         if (ret == DB_RUNRECOVERY) {
365                 syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
366                 syslog(LOG_ALERT, "Attempting catastrophic recovery...\n");
367                 flags &= ~DB_RECOVER;
368                 flags |= DB_RECOVER_FATAL;
369                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
370         }
371         if (ret) {
372                 syslog(LOG_EMERG, "dbenv->open: %s\n", db_strerror(ret));
373                 dbenv->close(dbenv, 0);
374                 syslog(LOG_EMERG, "exit code %d\n", ret);
375                 exit(CTDLEXIT_DB);
376         }
377
378         syslog(LOG_INFO, "Starting up DB\n");
379
380         for (i = 0; i < MAXCDB; ++i) {
381
382                 /* Create a database handle */
383                 ret = db_create(&dbp[i], dbenv, 0);
384                 if (ret) {
385                         syslog(LOG_EMERG, "db_create: %s\n", db_strerror(ret));
386                         syslog(LOG_EMERG, "exit code %d\n", ret);
387                         exit(CTDLEXIT_DB);
388                 }
389
390
391                 /* Arbitrary names for our tables -- we reference them by
392                  * number, so we don't have string names for them.
393                  */
394                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
395
396                 ret = dbp[i]->open(dbp[i],
397                                    NULL,
398                                    dbfilename,
399                                    NULL,
400                                    DB_BTREE,
401                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
402                                    0600
403                 );
404                 if (ret) {
405                         syslog(LOG_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
406                         if (ret == ENOMEM) {
407                                 syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
408                         }
409                         syslog(LOG_EMERG, "exit code %d\n", ret);
410                         exit(CTDLEXIT_DB);
411                 }
412         }
413
414 }
415
416
417 /* Make sure we own all the files, because in a few milliseconds
418  * we're going to drop root privs.
419  */
420 void cdb_chmod_data(void) {
421         DIR *dp;
422         struct dirent *d;
423         char filename[PATH_MAX];
424
425         dp = opendir(ctdl_data_dir);
426         if (dp != NULL) {
427                 while (d = readdir(dp), d != NULL) {
428                         if (d->d_name[0] != '.') {
429                                 snprintf(filename, sizeof filename,
430                                          "%s/%s", ctdl_data_dir, d->d_name);
431                                 syslog(LOG_DEBUG, "chmod(%s, 0600) returned %d\n",
432                                         filename, chmod(filename, 0600)
433                                 );
434                                 syslog(LOG_DEBUG, "chown(%s, CTDLUID, -1) returned %d\n",
435                                         filename, chown(filename, CTDLUID, (-1))
436                                 );
437                         }
438                 }
439                 closedir(dp);
440         }
441
442         syslog(LOG_DEBUG, "open_databases() finished\n");
443         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
444 }
445
446
447 /*
448  * Close all of the db database files we've opened.  This can be done
449  * in a loop, since it's just a bunch of closes.
450  */
451 void close_databases(void)
452 {
453         int a;
454         int ret;
455
456         ctdl_thread_internal_free_tsd();
457         
458         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
459                 syslog(LOG_EMERG,
460                         "txn_checkpoint: %s\n", db_strerror(ret));
461         }
462
463         /* print some statistics... */
464 #ifdef DB_STAT_ALL
465         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
466 #endif
467
468         /* close the tables */
469         for (a = 0; a < MAXCDB; ++a) {
470                 syslog(LOG_INFO, "Closing database %02x\n", a);
471                 ret = dbp[a]->close(dbp[a], 0);
472                 if (ret) {
473                         syslog(LOG_EMERG, "db_close: %s\n", db_strerror(ret));
474                 }
475
476         }
477
478         /* Close the handle. */
479         ret = dbenv->close(dbenv, 0);
480         if (ret) {
481                 syslog(LOG_EMERG, "DBENV->close: %s\n", db_strerror(ret));
482         }
483 }
484
485
486 /*
487  * Compression functions only used if we have zlib
488  */
489 void cdb_decompress_if_necessary(struct cdbdata *cdb)
490 {
491         static int magic = COMPRESS_MAGIC;
492
493         if ((cdb == NULL) || 
494             (cdb->ptr == NULL) || 
495             (cdb->len < sizeof(magic)) ||
496             (memcmp(cdb->ptr, &magic, sizeof(magic))))
497             return;
498
499 #ifdef HAVE_ZLIB
500         /* At this point we know we're looking at a compressed item. */
501
502         struct CtdlCompressHeader zheader;
503         char *uncompressed_data;
504         char *compressed_data;
505         uLongf destLen, sourceLen;
506         size_t cplen;
507
508         memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
509         cplen = sizeof(struct CtdlCompressHeader);
510         if (sizeof(struct CtdlCompressHeader) > cdb->len)
511                 cplen = cdb->len;
512         memcpy(&zheader, cdb->ptr, cplen);
513
514         compressed_data = cdb->ptr;
515         compressed_data += sizeof(struct CtdlCompressHeader);
516
517         sourceLen = (uLongf) zheader.compressed_len;
518         destLen = (uLongf) zheader.uncompressed_len;
519         uncompressed_data = malloc(zheader.uncompressed_len);
520
521         if (uncompress((Bytef *) uncompressed_data,
522                        (uLongf *) & destLen,
523                        (const Bytef *) compressed_data,
524                        (uLong) sourceLen) != Z_OK) {
525                 syslog(LOG_EMERG, "uncompress() error\n");
526                 abort();
527         }
528
529         free(cdb->ptr);
530         cdb->len = (size_t) destLen;
531         cdb->ptr = uncompressed_data;
532 #else                           /* HAVE_ZLIB */
533         syslog(LOG_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
534         abort();
535 #endif                          /* HAVE_ZLIB */
536 }
537
538
539
540 /*
541  * Store a piece of data.  Returns 0 if the operation was successful.  If a
542  * key already exists it should be overwritten.
543  */
544 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
545 {
546
547         DBT dkey, ddata;
548         DB_TXN *tid;
549         int ret = 0;
550
551 #ifdef HAVE_ZLIB
552         struct CtdlCompressHeader zheader;
553         char *compressed_data = NULL;
554         int compressing = 0;
555         size_t buffer_len = 0;
556         uLongf destLen = 0;
557 #endif
558
559         memset(&dkey, 0, sizeof(DBT));
560         memset(&ddata, 0, sizeof(DBT));
561         dkey.size = ckeylen;
562         dkey.data = ckey;
563         ddata.size = cdatalen;
564         ddata.data = cdata;
565
566 #ifdef HAVE_ZLIB
567         /* Only compress Visit records.  Everything else is uncompressed. */
568         if (cdb == CDB_VISIT) {
569                 compressing = 1;
570                 zheader.magic = COMPRESS_MAGIC;
571                 zheader.uncompressed_len = cdatalen;
572                 buffer_len = ((cdatalen * 101) / 100) + 100
573                     + sizeof(struct CtdlCompressHeader);
574                 destLen = (uLongf) buffer_len;
575                 compressed_data = malloc(buffer_len);
576                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
577                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
578                 {
579                         syslog(LOG_EMERG, "compress2() error\n");
580                         abort();
581                 }
582                 zheader.compressed_len = (size_t) destLen;
583                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
584                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
585                 ddata.data = compressed_data;
586         }
587 #endif
588
589         if (MYTID != NULL) {
590                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
591                                     MYTID,      /* transaction ID */
592                                     &dkey,      /* key */
593                                     &ddata,     /* data */
594                                     0); /* flags */
595                 if (ret) {
596                         syslog(LOG_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
597                         abort();
598                 }
599 #ifdef HAVE_ZLIB
600                 if (compressing)
601                         free(compressed_data);
602 #endif
603                 return ret;
604
605         } else {
606                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
607
608               retry:
609                 txbegin(&tid);
610
611                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
612                                          tid,   /* transaction ID */
613                                          &dkey, /* key */
614                                          &ddata,        /* data */
615                                          0))) { /* flags */
616                         if (ret == DB_LOCK_DEADLOCK) {
617                                 txabort(tid);
618                                 goto retry;
619                         } else {
620                                 syslog(LOG_EMERG, "cdb_store(%d): %s\n",
621                                         cdb, db_strerror(ret));
622                                 abort();
623                         }
624                 } else {
625                         txcommit(tid);
626 #ifdef HAVE_ZLIB
627                         if (compressing) {
628                                 free(compressed_data);
629                         }
630 #endif
631                         return ret;
632                 }
633         }
634 }
635
636
637 /*
638  * Delete a piece of data.  Returns 0 if the operation was successful.
639  */
640 int cdb_delete(int cdb, void *key, int keylen)
641 {
642
643         DBT dkey;
644         DB_TXN *tid;
645         int ret;
646
647         memset(&dkey, 0, sizeof dkey);
648         dkey.size = keylen;
649         dkey.data = key;
650
651         if (MYTID != NULL) {
652                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
653                 if (ret) {
654                         syslog(LOG_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
655                         if (ret != DB_NOTFOUND) {
656                                 abort();
657                         }
658                 }
659         } else {
660                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
661
662               retry:
663                 txbegin(&tid);
664
665                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
666                     && ret != DB_NOTFOUND) {
667                         if (ret == DB_LOCK_DEADLOCK) {
668                                 txabort(tid);
669                                 goto retry;
670                         } else {
671                                 syslog(LOG_EMERG, "cdb_delete(%d): %s\n",
672                                         cdb, db_strerror(ret));
673                                 abort();
674                         }
675                 } else {
676                         txcommit(tid);
677                 }
678         }
679         return ret;
680 }
681
682 static DBC *localcursor(int cdb)
683 {
684         int ret;
685         DBC *curs;
686
687         if (MYCURSORS[cdb] == NULL)
688                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
689         else
690                 ret =
691                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
692                                           DB_POSITION);
693
694         if (ret) {
695                 syslog(LOG_EMERG, "localcursor: %s\n", db_strerror(ret));
696                 abort();
697         }
698
699         return curs;
700 }
701
702
703 /*
704  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
705  * a struct cdbdata which it is the caller's responsibility to free later on
706  * using the cdb_free() routine.
707  */
708 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen)
709 {
710
711         struct cdbdata *tempcdb;
712         DBT dkey, dret;
713         int ret;
714
715         memset(&dkey, 0, sizeof(DBT));
716         dkey.size = keylen;
717         dkey.data = key;
718
719         if (MYTID != NULL) {
720                 memset(&dret, 0, sizeof(DBT));
721                 dret.flags = DB_DBT_MALLOC;
722                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
723         } else {
724                 DBC *curs;
725
726                 do {
727                         memset(&dret, 0, sizeof(DBT));
728                         dret.flags = DB_DBT_MALLOC;
729
730                         curs = localcursor(cdb);
731
732                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
733                         cclose(curs);
734                 }
735                 while (ret == DB_LOCK_DEADLOCK);
736
737         }
738
739         if ((ret != 0) && (ret != DB_NOTFOUND)) {
740                 syslog(LOG_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
741                 abort();
742         }
743
744         if (ret != 0)
745                 return NULL;
746         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
747
748         if (tempcdb == NULL) {
749                 syslog(LOG_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
750                 abort();
751         }
752
753         tempcdb->len = dret.size;
754         tempcdb->ptr = dret.data;
755         cdb_decompress_if_necessary(tempcdb);
756         return (tempcdb);
757 }
758
759
760 /*
761  * Free a cdbdata item.
762  *
763  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
764  * other code to assume ownership of that memory simply by storing the
765  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
766  * avoid freeing it.
767  */
768 void cdb_free(struct cdbdata *cdb)
769 {
770         if (cdb->ptr) {
771                 free(cdb->ptr);
772         }
773         free(cdb);
774 }
775
776 void cdb_close_cursor(int cdb)
777 {
778         if (MYCURSORS[cdb] != NULL) {
779                 cclose(MYCURSORS[cdb]);
780         }
781
782         MYCURSORS[cdb] = NULL;
783 }
784
785 /* 
786  * Prepare for a sequential search of an entire database.
787  * (There is guaranteed to be no more than one traversal in
788  * progress per thread at any given time.)
789  */
790 void cdb_rewind(int cdb)
791 {
792         int ret = 0;
793
794         if (MYCURSORS[cdb] != NULL) {
795                 syslog(LOG_EMERG,
796                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
797                 abort();
798                 /* cclose(MYCURSORS[cdb]); */
799         }
800
801         /*
802          * Now initialize the cursor
803          */
804         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
805         if (ret) {
806                 syslog(LOG_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
807                 abort();
808         }
809 }
810
811
812 /*
813  * Fetch the next item in a sequential search.  Returns a pointer to a 
814  * cdbdata structure, or NULL if we've hit the end.
815  */
816 struct cdbdata *cdb_next_item(int cdb)
817 {
818         DBT key, data;
819         struct cdbdata *cdbret;
820         int ret = 0;
821
822         /* Initialize the key/data pair so the flags aren't set. */
823         memset(&key, 0, sizeof(key));
824         memset(&data, 0, sizeof(data));
825         data.flags = DB_DBT_MALLOC;
826
827         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
828
829         if (ret) {
830                 if (ret != DB_NOTFOUND) {
831                         syslog(LOG_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
832                         abort();
833                 }
834                 cclose(MYCURSORS[cdb]);
835                 MYCURSORS[cdb] = NULL;
836                 return NULL;    /* presumably, end of file */
837         }
838
839         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
840         cdbret->len = data.size;
841         cdbret->ptr = data.data;
842         cdb_decompress_if_necessary(cdbret);
843
844         return (cdbret);
845 }
846
847
848
849 /*
850  * Transaction-based stuff.  I'm writing this as I bake cookies...
851  */
852
853 void cdb_begin_transaction(void)
854 {
855
856         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
857
858         if (MYTID != NULL) {
859                 syslog(LOG_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
860                 abort();
861         }
862
863         txbegin(&MYTID);
864 }
865
866 void cdb_end_transaction(void)
867 {
868         int i;
869
870         for (i = 0; i < MAXCDB; i++)
871                 if (MYCURSORS[i] != NULL) {
872                         syslog(LOG_WARNING,
873                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
874                                 i);
875                         cclose(MYCURSORS[i]);
876                         MYCURSORS[i] = NULL;
877                 }
878
879         if (MYTID == NULL) {
880                 syslog(LOG_EMERG,
881                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
882                 abort();
883         } else {
884                 txcommit(MYTID);
885         }
886
887         MYTID = NULL;
888 }
889
890 /*
891  * Truncate (delete every record)
892  */
893 void cdb_trunc(int cdb)
894 {
895         /* DB_TXN *tid; */
896         int ret;
897         u_int32_t count;
898
899         if (MYTID != NULL) {
900                 syslog(LOG_EMERG,
901                         "cdb_trunc must not be called in a transaction.\n");
902                 abort();
903         } else {
904                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
905
906               retry:
907                 /* txbegin(&tid); */
908
909                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
910                                               NULL,     /* transaction ID */
911                                               &count,   /* #rows deleted */
912                                               0))) {    /* flags */
913                         if (ret == DB_LOCK_DEADLOCK) {
914                                 /* txabort(tid); */
915                                 goto retry;
916                         } else {
917                                 syslog(LOG_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
918                                 if (ret == ENOMEM) {
919                                         syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
920                                 }
921                                 abort();
922                         }
923                 } else {
924                         /* txcommit(tid); */
925                 }
926         }
927 }