* Began (but did not finish) applying GPL3+ declarations to each source file. This...
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  * Copyright (c) 1987-2009 by the citadel.org team
7  *
8  *  This program is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation; either version 3 of the License, or
11  *  (at your option) any later version.
12  *
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 /*****************************************************************************
25        Tunable configuration parameters for the Berkeley DB back end
26  *****************************************************************************/
27
28 /* Citadel will checkpoint the db at the end of every session, but only if
29  * the specified number of kilobytes has been written, or if the specified
30  * number of minutes has passed, since the last checkpoint.
31  */
32 #define MAX_CHECKPOINT_KBYTES   256
33 #define MAX_CHECKPOINT_MINUTES  15
34
35 /*****************************************************************************/
36
37 #include "sysdep.h"
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <ctype.h>
42 #include <string.h>
43 #include <errno.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <dirent.h>
47
48 #ifdef HAVE_DB_H
49 #include <db.h>
50 #elif defined(HAVE_DB4_DB_H)
51 #include <db4/db.h>
52 #else
53 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
54 #endif
55
56
57 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
58 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
59 #endif
60
61
62 #include <libcitadel.h>
63 #include "citadel.h"
64 #include "server.h"
65 #include "citserver.h"
66 #include "database.h"
67 #include "msgbase.h"
68 #include "sysdep_decls.h"
69 #include "threads.h"
70 #include "config.h"
71 #include "control.h"
72
73 #include "ctdl_module.h"
74
75
76 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
77 static DB_ENV *dbenv;           /* The DB environment (global) */
78
79
80 #ifdef HAVE_ZLIB
81 #include <zlib.h>
82 #endif
83
84
85 /* Verbose logging callback */
86 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
87 {
88         if (!IsEmptyStr(msg)) {
89                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
90         }
91 }
92
93
94 /* Verbose logging callback */
95 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
96 {
97         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
98 }
99
100
101 /* just a little helper function */
102 static void txabort(DB_TXN * tid)
103 {
104         int ret;
105
106         ret = tid->abort(tid);
107
108         if (ret) {
109                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
110                 abort();
111         }
112 }
113
114 /* this one is even more helpful than the last. */
115 static void txcommit(DB_TXN * tid)
116 {
117         int ret;
118
119         ret = tid->commit(tid, 0);
120
121         if (ret) {
122                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
123                 abort();
124         }
125 }
126
127 /* are you sensing a pattern yet? */
128 static void txbegin(DB_TXN ** tid)
129 {
130         int ret;
131
132         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
133
134         if (ret) {
135                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void dbpanic(DB_ENV * env, int errval)
141 {
142         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
143 }
144
145 static void cclose(DBC * cursor)
146 {
147         int ret;
148
149         if ((ret = cursor->c_close(cursor))) {
150                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
151                 abort();
152         }
153 }
154
155 static void bailIfCursor(DBC ** cursors, const char *msg)
156 {
157         int i;
158
159         for (i = 0; i < MAXCDB; i++)
160                 if (cursors[i] != NULL) {
161                         CtdlLogPrintf(CTDL_EMERG,
162                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
163                         abort();
164                 }
165 }
166
167 void check_handles(void *arg)
168 {
169         if (arg != NULL) {
170                 ThreadTSD *tsd = (ThreadTSD *) arg;
171
172                 bailIfCursor(tsd->cursors, "in check_handles");
173
174                 if (tsd->tid != NULL) {
175                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
176                         abort();
177                 }
178         }
179 }
180
181 void cdb_check_handles(void)
182 {
183         check_handles(pthread_getspecific(ThreadKey));
184 }
185
186
187 /*
188  * Cull the database logs
189  */
190 static void cdb_cull_logs(void)
191 {
192         u_int32_t flags;
193         int ret;
194         char **file, **list;
195         char errmsg[SIZ];
196
197         flags = DB_ARCH_ABS;
198
199         /* Get the list of names. */
200         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
201                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
202                 return;
203         }
204
205         /* Print the list of names. */
206         if (list != NULL) {
207                 for (file = list; *file != NULL; ++file) {
208                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
209                         ret = unlink(*file);
210                         if (ret != 0) {
211                                 snprintf(errmsg, sizeof(errmsg),
212                                          " ** ERROR **\n \n \n "
213                                          "Citadel was unable to delete the "
214                                          "database log file '%s' because of the "
215                                          "following error:\n \n %s\n \n"
216                                          " This log file is no longer in use "
217                                          "and may be safely deleted.\n",
218                                          *file, strerror(errno));
219                                 aide_message(errmsg, "Database Warning Message");
220                         }
221                 }
222                 free(list);
223         }
224 }
225
226 /*
227  * Manually initiate log file cull.
228  */
229 void cmd_cull(char *argbuf) {
230         if (CtdlAccessCheck(ac_internal)) return;
231         cdb_cull_logs();
232         cprintf("%d Database log file cull completed.\n", CIT_OK);
233 }
234
235
236 /*
237  * Request a checkpoint of the database.  Called once per minute by the thread manager.
238  */
239 void cdb_checkpoint(void)
240 {
241         int ret;
242
243         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
244         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
245
246         if (ret != 0) {
247                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
248                 abort();
249         }
250
251         /* After a successful checkpoint, we can cull the unused logs */
252         if (config.c_auto_cull) {
253                 cdb_cull_logs();
254         }
255 }
256
257
258
259 /*
260  * Open the various databases we'll be using.  Any database which
261  * does not exist should be created.  Note that we don't need a
262  * critical section here, because there aren't any active threads
263  * manipulating the database yet.
264  */
265 void open_databases(void)
266 {
267         int ret;
268         int i;
269         char dbfilename[32];
270         u_int32_t flags = 0;
271         int dbversion_major, dbversion_minor, dbversion_patch;
272         int current_dbversion = 0;
273
274         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
275         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
276         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
277                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
278
279         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
280
281         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
282         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
283
284         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
285            && (CitControl.MMdbversion > current_dbversion) ) {
286                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
287                                         "of Berkeley DB that is older than that which last created or\n"
288                                         "updated the database.  Because this would probably cause data\n"
289                                         "corruption or loss, the server is aborting execution now.\n");
290                 exit(CTDLEXIT_DB);
291         }
292
293         CitControl.MMdbversion = current_dbversion;
294         put_control();
295
296 #ifdef HAVE_ZLIB
297         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
298 #endif
299
300         /*
301          * Silently try to create the database subdirectory.  If it's
302          * already there, no problem.
303          */
304         mkdir(ctdl_data_dir, 0700);
305         chmod(ctdl_data_dir, 0700);
306         chown(ctdl_data_dir, CTDLUID, (-1));
307
308         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
309         db_env_set_func_yield(sched_yield);
310         ret = db_env_create(&dbenv, 0);
311         if (ret) {
312                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
313                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
314                 exit(CTDLEXIT_DB);
315         }
316         dbenv->set_errpfx(dbenv, "citserver");
317         dbenv->set_paniccall(dbenv, dbpanic);
318         dbenv->set_errcall(dbenv, cdb_verbose_err);
319         dbenv->set_errpfx(dbenv, "ctdl");
320 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
321         dbenv->set_msgcall(dbenv, cdb_verbose_log);
322 #endif
323         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
324         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
325
326         /*
327          * We want to specify the shared memory buffer pool cachesize,
328          * but everything else is the default.
329          */
330         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
331         if (ret) {
332                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
333                 dbenv->close(dbenv, 0);
334                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
335                 exit(CTDLEXIT_DB);
336         }
337
338         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
339                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
340                 dbenv->close(dbenv, 0);
341                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
342                 exit(CTDLEXIT_DB);
343         }
344
345         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
346         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
347         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
348         if (ret == DB_RUNRECOVERY) {
349                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
350                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
351                 flags |= DB_RECOVER;
352                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
353         }
354         if (ret == DB_RUNRECOVERY) {
355                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
356                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
357                 flags &= ~DB_RECOVER;
358                 flags |= DB_RECOVER_FATAL;
359                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
360         }
361         if (ret) {
362                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
363                 dbenv->close(dbenv, 0);
364                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
365                 exit(CTDLEXIT_DB);
366         }
367
368         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
369
370         for (i = 0; i < MAXCDB; ++i) {
371
372                 /* Create a database handle */
373                 ret = db_create(&dbp[i], dbenv, 0);
374                 if (ret) {
375                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
376                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
377                         exit(CTDLEXIT_DB);
378                 }
379
380
381                 /* Arbitrary names for our tables -- we reference them by
382                  * number, so we don't have string names for them.
383                  */
384                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
385
386                 ret = dbp[i]->open(dbp[i],
387                                    NULL,
388                                    dbfilename,
389                                    NULL,
390                                    DB_BTREE,
391                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
392                                    0600
393                 );
394                 if (ret) {
395                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
396                         if (ret == ENOMEM) {
397                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
398                         }
399                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
400                         exit(CTDLEXIT_DB);
401                 }
402         }
403
404 }
405
406
407 /* Make sure we own all the files, because in a few milliseconds
408  * we're going to drop root privs.
409  */
410 void cdb_chmod_data(void) {
411         DIR *dp;
412         struct dirent *d;
413         char filename[PATH_MAX];
414
415         dp = opendir(ctdl_data_dir);
416         if (dp != NULL) {
417                 while (d = readdir(dp), d != NULL) {
418                         if (d->d_name[0] != '.') {
419                                 snprintf(filename, sizeof filename,
420                                          "%s/%s", ctdl_data_dir, d->d_name);
421                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
422                                         filename, chmod(filename, 0600)
423                                 );
424                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
425                                         filename, chown(filename, CTDLUID, (-1))
426                                 );
427                         }
428                 }
429                 closedir(dp);
430         }
431
432         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
433         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
434 }
435
436
437 /*
438  * Close all of the db database files we've opened.  This can be done
439  * in a loop, since it's just a bunch of closes.
440  */
441 void close_databases(void)
442 {
443         int a;
444         int ret;
445
446         ctdl_thread_internal_free_tsd();
447         
448         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
449                 CtdlLogPrintf(CTDL_EMERG,
450                         "txn_checkpoint: %s\n", db_strerror(ret));
451         }
452
453         /* print some statistics... */
454 #ifdef DB_STAT_ALL
455         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
456 #endif
457
458         /* close the tables */
459         for (a = 0; a < MAXCDB; ++a) {
460                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
461                 ret = dbp[a]->close(dbp[a], 0);
462                 if (ret) {
463                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
464                 }
465
466         }
467
468         /* Close the handle. */
469         ret = dbenv->close(dbenv, 0);
470         if (ret) {
471                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
472         }
473 }
474
475
476 /*
477  * Compression functions only used if we have zlib
478  */
479 void cdb_decompress_if_necessary(struct cdbdata *cdb)
480 {
481         static int magic = COMPRESS_MAGIC;
482
483         if (cdb == NULL)
484                 return;
485         if (cdb->ptr == NULL)
486                 return;
487         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
488                 return;
489
490 #ifdef HAVE_ZLIB
491         /* At this point we know we're looking at a compressed item. */
492
493         struct CtdlCompressHeader zheader;
494         char *uncompressed_data;
495         char *compressed_data;
496         uLongf destLen, sourceLen;
497
498         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
499
500         compressed_data = cdb->ptr;
501         compressed_data += sizeof(struct CtdlCompressHeader);
502
503         sourceLen = (uLongf) zheader.compressed_len;
504         destLen = (uLongf) zheader.uncompressed_len;
505         uncompressed_data = malloc(zheader.uncompressed_len);
506
507         if (uncompress((Bytef *) uncompressed_data,
508                        (uLongf *) & destLen,
509                        (const Bytef *) compressed_data,
510                        (uLong) sourceLen) != Z_OK) {
511                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
512                 abort();
513         }
514
515         free(cdb->ptr);
516         cdb->len = (size_t) destLen;
517         cdb->ptr = uncompressed_data;
518 #else                           /* HAVE_ZLIB */
519         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
520         abort();
521 #endif                          /* HAVE_ZLIB */
522 }
523
524
525
526 /*
527  * Store a piece of data.  Returns 0 if the operation was successful.  If a
528  * key already exists it should be overwritten.
529  */
530 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
531 {
532
533         DBT dkey, ddata;
534         DB_TXN *tid;
535         int ret = 0;
536
537 #ifdef HAVE_ZLIB
538         struct CtdlCompressHeader zheader;
539         char *compressed_data = NULL;
540         int compressing = 0;
541         size_t buffer_len = 0;
542         uLongf destLen = 0;
543 #endif
544
545         memset(&dkey, 0, sizeof(DBT));
546         memset(&ddata, 0, sizeof(DBT));
547         dkey.size = ckeylen;
548         dkey.data = ckey;
549         ddata.size = cdatalen;
550         ddata.data = cdata;
551
552 #ifdef HAVE_ZLIB
553         /* Only compress Visit records.  Everything else is uncompressed. */
554         if (cdb == CDB_VISIT) {
555                 compressing = 1;
556                 zheader.magic = COMPRESS_MAGIC;
557                 zheader.uncompressed_len = cdatalen;
558                 buffer_len = ((cdatalen * 101) / 100) + 100
559                     + sizeof(struct CtdlCompressHeader);
560                 destLen = (uLongf) buffer_len;
561                 compressed_data = malloc(buffer_len);
562                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
563                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
564                 {
565                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
566                         abort();
567                 }
568                 zheader.compressed_len = (size_t) destLen;
569                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
570                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
571                 ddata.data = compressed_data;
572         }
573 #endif
574
575         if (MYTID != NULL) {
576                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
577                                     MYTID,      /* transaction ID */
578                                     &dkey,      /* key */
579                                     &ddata,     /* data */
580                                     0); /* flags */
581                 if (ret) {
582                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
583                         abort();
584                 }
585 #ifdef HAVE_ZLIB
586                 if (compressing)
587                         free(compressed_data);
588 #endif
589                 return ret;
590
591         } else {
592                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
593
594               retry:
595                 txbegin(&tid);
596
597                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
598                                          tid,   /* transaction ID */
599                                          &dkey, /* key */
600                                          &ddata,        /* data */
601                                          0))) { /* flags */
602                         if (ret == DB_LOCK_DEADLOCK) {
603                                 txabort(tid);
604                                 goto retry;
605                         } else {
606                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
607                                         cdb, db_strerror(ret));
608                                 abort();
609                         }
610                 } else {
611                         txcommit(tid);
612 #ifdef HAVE_ZLIB
613                         if (compressing) {
614                                 free(compressed_data);
615                         }
616 #endif
617                         return ret;
618                 }
619         }
620 }
621
622
623 /*
624  * Delete a piece of data.  Returns 0 if the operation was successful.
625  */
626 int cdb_delete(int cdb, void *key, int keylen)
627 {
628
629         DBT dkey;
630         DB_TXN *tid;
631         int ret;
632
633         memset(&dkey, 0, sizeof dkey);
634         dkey.size = keylen;
635         dkey.data = key;
636
637         if (MYTID != NULL) {
638                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
639                 if (ret) {
640                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
641                         if (ret != DB_NOTFOUND) {
642                                 abort();
643                         }
644                 }
645         } else {
646                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
647
648               retry:
649                 txbegin(&tid);
650
651                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
652                     && ret != DB_NOTFOUND) {
653                         if (ret == DB_LOCK_DEADLOCK) {
654                                 txabort(tid);
655                                 goto retry;
656                         } else {
657                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
658                                         cdb, db_strerror(ret));
659                                 abort();
660                         }
661                 } else {
662                         txcommit(tid);
663                 }
664         }
665         return ret;
666 }
667
668 static DBC *localcursor(int cdb)
669 {
670         int ret;
671         DBC *curs;
672
673         if (MYCURSORS[cdb] == NULL)
674                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
675         else
676                 ret =
677                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
678                                           DB_POSITION);
679
680         if (ret) {
681                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
682                 abort();
683         }
684
685         return curs;
686 }
687
688
689 /*
690  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
691  * a struct cdbdata which it is the caller's responsibility to free later on
692  * using the cdb_free() routine.
693  */
694 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
695 {
696
697         struct cdbdata *tempcdb;
698         DBT dkey, dret;
699         int ret;
700
701         memset(&dkey, 0, sizeof(DBT));
702         dkey.size = keylen;
703         dkey.data = key;
704
705         if (MYTID != NULL) {
706                 memset(&dret, 0, sizeof(DBT));
707                 dret.flags = DB_DBT_MALLOC;
708                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
709         } else {
710                 DBC *curs;
711
712                 do {
713                         memset(&dret, 0, sizeof(DBT));
714                         dret.flags = DB_DBT_MALLOC;
715
716                         curs = localcursor(cdb);
717
718                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
719                         cclose(curs);
720                 }
721                 while (ret == DB_LOCK_DEADLOCK);
722
723         }
724
725         if ((ret != 0) && (ret != DB_NOTFOUND)) {
726                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
727                 abort();
728         }
729
730         if (ret != 0)
731                 return NULL;
732         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
733
734         if (tempcdb == NULL) {
735                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
736                 abort();
737         }
738
739         tempcdb->len = dret.size;
740         tempcdb->ptr = dret.data;
741         cdb_decompress_if_necessary(tempcdb);
742         return (tempcdb);
743 }
744
745
746 /*
747  * Free a cdbdata item.
748  *
749  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
750  * other code to assume ownership of that memory simply by storing the
751  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
752  * avoid freeing it.
753  */
754 void cdb_free(struct cdbdata *cdb)
755 {
756         if (cdb->ptr) {
757                 free(cdb->ptr);
758         }
759         free(cdb);
760 }
761
762 void cdb_close_cursor(int cdb)
763 {
764         if (MYCURSORS[cdb] != NULL) {
765                 cclose(MYCURSORS[cdb]);
766         }
767
768         MYCURSORS[cdb] = NULL;
769 }
770
771 /* 
772  * Prepare for a sequential search of an entire database.
773  * (There is guaranteed to be no more than one traversal in
774  * progress per thread at any given time.)
775  */
776 void cdb_rewind(int cdb)
777 {
778         int ret = 0;
779
780         if (MYCURSORS[cdb] != NULL) {
781                 CtdlLogPrintf(CTDL_EMERG,
782                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
783                 abort();
784                 /* cclose(MYCURSORS[cdb]); */
785         }
786
787         /*
788          * Now initialize the cursor
789          */
790         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
791         if (ret) {
792                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
793                 abort();
794         }
795 }
796
797
798 /*
799  * Fetch the next item in a sequential search.  Returns a pointer to a 
800  * cdbdata structure, or NULL if we've hit the end.
801  */
802 struct cdbdata *cdb_next_item(int cdb)
803 {
804         DBT key, data;
805         struct cdbdata *cdbret;
806         int ret = 0;
807
808         /* Initialize the key/data pair so the flags aren't set. */
809         memset(&key, 0, sizeof(key));
810         memset(&data, 0, sizeof(data));
811         data.flags = DB_DBT_MALLOC;
812
813         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
814
815         if (ret) {
816                 if (ret != DB_NOTFOUND) {
817                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
818                         abort();
819                 }
820                 cclose(MYCURSORS[cdb]);
821                 MYCURSORS[cdb] = NULL;
822                 return NULL;    /* presumably, end of file */
823         }
824
825         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
826         cdbret->len = data.size;
827         cdbret->ptr = data.data;
828         cdb_decompress_if_necessary(cdbret);
829
830         return (cdbret);
831 }
832
833
834
835 /*
836  * Transaction-based stuff.  I'm writing this as I bake cookies...
837  */
838
839 void cdb_begin_transaction(void)
840 {
841
842         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
843
844         if (MYTID != NULL) {
845                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
846                 abort();
847         }
848
849         txbegin(&MYTID);
850 }
851
852 void cdb_end_transaction(void)
853 {
854         int i;
855
856         for (i = 0; i < MAXCDB; i++)
857                 if (MYCURSORS[i] != NULL) {
858                         CtdlLogPrintf(CTDL_WARNING,
859                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
860                                 i);
861                         cclose(MYCURSORS[i]);
862                         MYCURSORS[i] = NULL;
863                 }
864
865         if (MYTID == NULL) {
866                 CtdlLogPrintf(CTDL_EMERG,
867                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
868                 abort();
869         } else {
870                 txcommit(MYTID);
871         }
872
873         MYTID = NULL;
874 }
875
876 /*
877  * Truncate (delete every record)
878  */
879 void cdb_trunc(int cdb)
880 {
881         /* DB_TXN *tid; */
882         int ret;
883         u_int32_t count;
884
885         if (MYTID != NULL) {
886                 CtdlLogPrintf(CTDL_EMERG,
887                         "cdb_trunc must not be called in a transaction.\n");
888                 abort();
889         } else {
890                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
891
892               retry:
893                 /* txbegin(&tid); */
894
895                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
896                                               NULL,     /* transaction ID */
897                                               &count,   /* #rows deleted */
898                                               0))) {    /* flags */
899                         if (ret == DB_LOCK_DEADLOCK) {
900                                 /* txabort(tid); */
901                                 goto retry;
902                         } else {
903                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
904                                 if (ret == ENOMEM) {
905                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
906                                 }
907                                 abort();
908                         }
909                 } else {
910                         /* txcommit(tid); */
911                 }
912         }
913 }