8f5e5c64af74612ad7dd146b1f1dda12b4700cfd
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
94                 abort();
95         }
96 }
97
98 /* this one is even more helpful than the last. */
99 static void txcommit(DB_TXN * tid)
100 {
101         int ret;
102
103         ret = tid->commit(tid, 0);
104
105         if (ret) {
106                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
107                 abort();
108         }
109 }
110
111 /* are you sensing a pattern yet? */
112 static void txbegin(DB_TXN ** tid)
113 {
114         int ret;
115
116         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
117
118         if (ret) {
119                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
120                 abort();
121         }
122 }
123
124 static void dbpanic(DB_ENV * env, int errval)
125 {
126         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
127 }
128
129 static void cclose(DBC * cursor)
130 {
131         int ret;
132
133         if ((ret = cursor->c_close(cursor))) {
134                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
135                 abort();
136         }
137 }
138
139 static void bailIfCursor(DBC ** cursors, const char *msg)
140 {
141         int i;
142
143         for (i = 0; i < MAXCDB; i++)
144                 if (cursors[i] != NULL) {
145                         CtdlLogPrintf(CTDL_EMERG,
146                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
147                         abort();
148                 }
149 }
150
151 void check_handles(void *arg)
152 {
153         if (arg != NULL) {
154                 ThreadTSD *tsd = (ThreadTSD *) arg;
155
156                 bailIfCursor(tsd->cursors, "in check_handles");
157
158                 if (tsd->tid != NULL) {
159                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
160                         abort();
161                 }
162         }
163 }
164
165 void cdb_check_handles(void)
166 {
167         check_handles(pthread_getspecific(ThreadKey));
168 }
169
170
171 /*
172  * Cull the database logs
173  */
174 static void cdb_cull_logs(void)
175 {
176         u_int32_t flags;
177         int ret;
178         char **file, **list;
179         char errmsg[SIZ];
180
181         flags = DB_ARCH_ABS;
182
183         /* Get the list of names. */
184         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
185                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
186                 return;
187         }
188
189         /* Print the list of names. */
190         if (list != NULL) {
191                 for (file = list; *file != NULL; ++file) {
192                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
193                         ret = unlink(*file);
194                         if (ret != 0) {
195                                 snprintf(errmsg, sizeof(errmsg),
196                                          " ** ERROR **\n \n \n "
197                                          "Citadel was unable to delete the "
198                                          "database log file '%s' because of the "
199                                          "following error:\n \n %s\n \n"
200                                          " This log file is no longer in use "
201                                          "and may be safely deleted.\n",
202                                          *file, strerror(errno));
203                                 aide_message(errmsg, "Database Warning Message");
204                         }
205                 }
206                 free(list);
207         }
208 }
209
210 /*
211  * Manually initiate log file cull.
212  */
213 void cmd_cull(char *argbuf) {
214         if (CtdlAccessCheck(ac_internal)) return;
215         cdb_cull_logs();
216         cprintf("%d Database log file cull completed.\n", CIT_OK);
217 }
218
219
220 /*
221  * Request a checkpoint of the database.  Called once per minute by the thread manager.
222  */
223 void cdb_checkpoint(void)
224 {
225         int ret;
226
227         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
228         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
229
230         if (ret != 0) {
231                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
232                 abort();
233         }
234
235         /* After a successful checkpoint, we can cull the unused logs */
236         if (config.c_auto_cull) {
237                 cdb_cull_logs();
238         }
239 }
240
241
242
243 /*
244  * Open the various databases we'll be using.  Any database which
245  * does not exist should be created.  Note that we don't need a
246  * critical section here, because there aren't any active threads
247  * manipulating the database yet.
248  */
249 void open_databases(void)
250 {
251         int ret;
252         int i;
253         char dbfilename[32];
254         u_int32_t flags = 0;
255         int dbversion_major, dbversion_minor, dbversion_patch;
256         int current_dbversion = 0;
257
258         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
259         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
260         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
261                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
262
263         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
264
265         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
266         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
267
268         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
269            && (CitControl.MMdbversion > current_dbversion) ) {
270                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
271                                         "of Berkeley DB that is older than that which last created or\n"
272                                         "updated the database.  Because this would probably cause data\n"
273                                         "corruption or loss, the server is aborting execution now.\n");
274                 exit(CTDLEXIT_DB);
275         }
276
277         CitControl.MMdbversion = current_dbversion;
278         put_control();
279
280 #ifdef HAVE_ZLIB
281         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
282 #endif
283
284         /*
285          * Silently try to create the database subdirectory.  If it's
286          * already there, no problem.
287          */
288         mkdir(ctdl_data_dir, 0700);
289         chmod(ctdl_data_dir, 0700);
290         chown(ctdl_data_dir, CTDLUID, (-1));
291
292         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
293         db_env_set_func_yield(sched_yield);
294         ret = db_env_create(&dbenv, 0);
295         if (ret) {
296                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
297                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
298                 exit(CTDLEXIT_DB);
299         }
300         dbenv->set_errpfx(dbenv, "citserver");
301         dbenv->set_paniccall(dbenv, dbpanic);
302         dbenv->set_errcall(dbenv, cdb_verbose_err);
303         dbenv->set_errpfx(dbenv, "ctdl");
304 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
305         dbenv->set_msgcall(dbenv, cdb_verbose_log);
306 #endif
307         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
308         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
309
310         /*
311          * We want to specify the shared memory buffer pool cachesize,
312          * but everything else is the default.
313          */
314         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
315         if (ret) {
316                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
317                 dbenv->close(dbenv, 0);
318                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
319                 exit(CTDLEXIT_DB);
320         }
321
322         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
323                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
324                 dbenv->close(dbenv, 0);
325                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
326                 exit(CTDLEXIT_DB);
327         }
328
329         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
330         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
331         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
332         if (ret == DB_RUNRECOVERY) {
333                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
334                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
335                 flags |= DB_RECOVER;
336                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
337         }
338         if (ret == DB_RUNRECOVERY) {
339                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
340                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
341                 flags &= ~DB_RECOVER;
342                 flags |= DB_RECOVER_FATAL;
343                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
344         }
345         if (ret) {
346                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
347                 dbenv->close(dbenv, 0);
348                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
349                 exit(CTDLEXIT_DB);
350         }
351
352         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
353
354         for (i = 0; i < MAXCDB; ++i) {
355
356                 /* Create a database handle */
357                 ret = db_create(&dbp[i], dbenv, 0);
358                 if (ret) {
359                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
360                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
361                         exit(CTDLEXIT_DB);
362                 }
363
364
365                 /* Arbitrary names for our tables -- we reference them by
366                  * number, so we don't have string names for them.
367                  */
368                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
369
370                 ret = dbp[i]->open(dbp[i],
371                                    NULL,
372                                    dbfilename,
373                                    NULL,
374                                    DB_BTREE,
375                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
376                                    0600
377                 );
378                 if (ret) {
379                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
380                         if (ret == ENOMEM) {
381                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
382                         }
383                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
384                         exit(CTDLEXIT_DB);
385                 }
386         }
387
388 }
389
390
391 /* Make sure we own all the files, because in a few milliseconds
392  * we're going to drop root privs.
393  */
394 void cdb_chmod_data(void) {
395         DIR *dp;
396         struct dirent *d;
397         char filename[PATH_MAX];
398
399         dp = opendir(ctdl_data_dir);
400         if (dp != NULL) {
401                 while (d = readdir(dp), d != NULL) {
402                         if (d->d_name[0] != '.') {
403                                 snprintf(filename, sizeof filename,
404                                          "%s/%s", ctdl_data_dir, d->d_name);
405                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
406                                         filename, chmod(filename, 0600)
407                                 );
408                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
409                                         filename, chown(filename, CTDLUID, (-1))
410                                 );
411                         }
412                 }
413                 closedir(dp);
414         }
415
416         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
417         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
418 }
419
420
421 /*
422  * Close all of the db database files we've opened.  This can be done
423  * in a loop, since it's just a bunch of closes.
424  */
425 void close_databases(void)
426 {
427         int a;
428         int ret;
429
430         ctdl_thread_internal_free_tsd();
431         
432         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
433                 CtdlLogPrintf(CTDL_EMERG,
434                         "txn_checkpoint: %s\n", db_strerror(ret));
435         }
436
437         /* print some statistics... */
438 #ifdef DB_STAT_ALL
439         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
440 #endif
441
442         /* close the tables */
443         for (a = 0; a < MAXCDB; ++a) {
444                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
445                 ret = dbp[a]->close(dbp[a], 0);
446                 if (ret) {
447                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
448                 }
449
450         }
451
452         /* Close the handle. */
453         ret = dbenv->close(dbenv, 0);
454         if (ret) {
455                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
456         }
457 }
458
459
460 /*
461  * Compression functions only used if we have zlib
462  */
463 void cdb_decompress_if_necessary(struct cdbdata *cdb)
464 {
465         static int magic = COMPRESS_MAGIC;
466
467         if (cdb == NULL)
468                 return;
469         if (cdb->ptr == NULL)
470                 return;
471         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
472                 return;
473
474 #ifdef HAVE_ZLIB
475         /* At this point we know we're looking at a compressed item. */
476
477         struct CtdlCompressHeader zheader;
478         char *uncompressed_data;
479         char *compressed_data;
480         uLongf destLen, sourceLen;
481
482         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
483
484         compressed_data = cdb->ptr;
485         compressed_data += sizeof(struct CtdlCompressHeader);
486
487         sourceLen = (uLongf) zheader.compressed_len;
488         destLen = (uLongf) zheader.uncompressed_len;
489         uncompressed_data = malloc(zheader.uncompressed_len);
490
491         if (uncompress((Bytef *) uncompressed_data,
492                        (uLongf *) & destLen,
493                        (const Bytef *) compressed_data,
494                        (uLong) sourceLen) != Z_OK) {
495                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
496                 abort();
497         }
498
499         free(cdb->ptr);
500         cdb->len = (size_t) destLen;
501         cdb->ptr = uncompressed_data;
502 #else                           /* HAVE_ZLIB */
503         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
504         abort();
505 #endif                          /* HAVE_ZLIB */
506 }
507
508
509
510 /*
511  * Store a piece of data.  Returns 0 if the operation was successful.  If a
512  * key already exists it should be overwritten.
513  */
514 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
515 {
516
517         DBT dkey, ddata;
518         DB_TXN *tid;
519         int ret = 0;
520
521 #ifdef HAVE_ZLIB
522         struct CtdlCompressHeader zheader;
523         char *compressed_data = NULL;
524         int compressing = 0;
525         size_t buffer_len = 0;
526         uLongf destLen = 0;
527 #endif
528
529         memset(&dkey, 0, sizeof(DBT));
530         memset(&ddata, 0, sizeof(DBT));
531         dkey.size = ckeylen;
532         dkey.data = ckey;
533         ddata.size = cdatalen;
534         ddata.data = cdata;
535
536 #ifdef HAVE_ZLIB
537         /* Only compress Visit records.  Everything else is uncompressed. */
538         if (cdb == CDB_VISIT) {
539                 compressing = 1;
540                 zheader.magic = COMPRESS_MAGIC;
541                 zheader.uncompressed_len = cdatalen;
542                 buffer_len = ((cdatalen * 101) / 100) + 100
543                     + sizeof(struct CtdlCompressHeader);
544                 destLen = (uLongf) buffer_len;
545                 compressed_data = malloc(buffer_len);
546                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
547                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
548                 {
549                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
550                         abort();
551                 }
552                 zheader.compressed_len = (size_t) destLen;
553                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
554                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
555                 ddata.data = compressed_data;
556         }
557 #endif
558
559         if (MYTID != NULL) {
560                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
561                                     MYTID,      /* transaction ID */
562                                     &dkey,      /* key */
563                                     &ddata,     /* data */
564                                     0); /* flags */
565                 if (ret) {
566                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
567                         abort();
568                 }
569 #ifdef HAVE_ZLIB
570                 if (compressing)
571                         free(compressed_data);
572 #endif
573                 return ret;
574
575         } else {
576                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
577
578               retry:
579                 txbegin(&tid);
580
581                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
582                                          tid,   /* transaction ID */
583                                          &dkey, /* key */
584                                          &ddata,        /* data */
585                                          0))) { /* flags */
586                         if (ret == DB_LOCK_DEADLOCK) {
587                                 txabort(tid);
588                                 goto retry;
589                         } else {
590                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
591                                         cdb, db_strerror(ret));
592                                 abort();
593                         }
594                 } else {
595                         txcommit(tid);
596 #ifdef HAVE_ZLIB
597                         if (compressing) {
598                                 free(compressed_data);
599                         }
600 #endif
601                         return ret;
602                 }
603         }
604 }
605
606
607 /*
608  * Delete a piece of data.  Returns 0 if the operation was successful.
609  */
610 int cdb_delete(int cdb, void *key, int keylen)
611 {
612
613         DBT dkey;
614         DB_TXN *tid;
615         int ret;
616
617         memset(&dkey, 0, sizeof dkey);
618         dkey.size = keylen;
619         dkey.data = key;
620
621         if (MYTID != NULL) {
622                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
623                 if (ret) {
624                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
625                         if (ret != DB_NOTFOUND) {
626                                 abort();
627                         }
628                 }
629         } else {
630                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
631
632               retry:
633                 txbegin(&tid);
634
635                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
636                     && ret != DB_NOTFOUND) {
637                         if (ret == DB_LOCK_DEADLOCK) {
638                                 txabort(tid);
639                                 goto retry;
640                         } else {
641                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
642                                         cdb, db_strerror(ret));
643                                 abort();
644                         }
645                 } else {
646                         txcommit(tid);
647                 }
648         }
649         return ret;
650 }
651
652 static DBC *localcursor(int cdb)
653 {
654         int ret;
655         DBC *curs;
656
657         if (MYCURSORS[cdb] == NULL)
658                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
659         else
660                 ret =
661                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
662                                           DB_POSITION);
663
664         if (ret) {
665                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
666                 abort();
667         }
668
669         return curs;
670 }
671
672
673 /*
674  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
675  * a struct cdbdata which it is the caller's responsibility to free later on
676  * using the cdb_free() routine.
677  */
678 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
679 {
680
681         struct cdbdata *tempcdb;
682         DBT dkey, dret;
683         int ret;
684
685         memset(&dkey, 0, sizeof(DBT));
686         dkey.size = keylen;
687         dkey.data = key;
688
689         if (MYTID != NULL) {
690                 memset(&dret, 0, sizeof(DBT));
691                 dret.flags = DB_DBT_MALLOC;
692                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
693         } else {
694                 DBC *curs;
695
696                 do {
697                         memset(&dret, 0, sizeof(DBT));
698                         dret.flags = DB_DBT_MALLOC;
699
700                         curs = localcursor(cdb);
701
702                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
703                         cclose(curs);
704                 }
705                 while (ret == DB_LOCK_DEADLOCK);
706
707         }
708
709         if ((ret != 0) && (ret != DB_NOTFOUND)) {
710                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
711                 abort();
712         }
713
714         if (ret != 0)
715                 return NULL;
716         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
717
718         if (tempcdb == NULL) {
719                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
720                 abort();
721         }
722
723         tempcdb->len = dret.size;
724         tempcdb->ptr = dret.data;
725         cdb_decompress_if_necessary(tempcdb);
726         return (tempcdb);
727 }
728
729
730 /*
731  * Free a cdbdata item.
732  *
733  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
734  * other code to assume ownership of that memory simply by storing the
735  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
736  * avoid freeing it.
737  */
738 void cdb_free(struct cdbdata *cdb)
739 {
740         if (cdb->ptr) {
741                 free(cdb->ptr);
742         }
743         free(cdb);
744 }
745
746 void cdb_close_cursor(int cdb)
747 {
748         if (MYCURSORS[cdb] != NULL) {
749                 cclose(MYCURSORS[cdb]);
750         }
751
752         MYCURSORS[cdb] = NULL;
753 }
754
755 /* 
756  * Prepare for a sequential search of an entire database.
757  * (There is guaranteed to be no more than one traversal in
758  * progress per thread at any given time.)
759  */
760 void cdb_rewind(int cdb)
761 {
762         int ret = 0;
763
764         if (MYCURSORS[cdb] != NULL) {
765                 CtdlLogPrintf(CTDL_EMERG,
766                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
767                 abort();
768                 /* cclose(MYCURSORS[cdb]); */
769         }
770
771         /*
772          * Now initialize the cursor
773          */
774         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
775         if (ret) {
776                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
777                 abort();
778         }
779 }
780
781
782 /*
783  * Fetch the next item in a sequential search.  Returns a pointer to a 
784  * cdbdata structure, or NULL if we've hit the end.
785  */
786 struct cdbdata *cdb_next_item(int cdb)
787 {
788         DBT key, data;
789         struct cdbdata *cdbret;
790         int ret = 0;
791
792         /* Initialize the key/data pair so the flags aren't set. */
793         memset(&key, 0, sizeof(key));
794         memset(&data, 0, sizeof(data));
795         data.flags = DB_DBT_MALLOC;
796
797         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
798
799         if (ret) {
800                 if (ret != DB_NOTFOUND) {
801                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
802                         abort();
803                 }
804                 cclose(MYCURSORS[cdb]);
805                 MYCURSORS[cdb] = NULL;
806                 return NULL;    /* presumably, end of file */
807         }
808
809         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
810         cdbret->len = data.size;
811         cdbret->ptr = data.data;
812         cdb_decompress_if_necessary(cdbret);
813
814         return (cdbret);
815 }
816
817
818
819 /*
820  * Transaction-based stuff.  I'm writing this as I bake cookies...
821  */
822
823 void cdb_begin_transaction(void)
824 {
825
826         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
827
828         if (MYTID != NULL) {
829                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
830                 abort();
831         }
832
833         txbegin(&MYTID);
834 }
835
836 void cdb_end_transaction(void)
837 {
838         int i;
839
840         for (i = 0; i < MAXCDB; i++)
841                 if (MYCURSORS[i] != NULL) {
842                         CtdlLogPrintf(CTDL_WARNING,
843                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
844                                 i);
845                         cclose(MYCURSORS[i]);
846                         MYCURSORS[i] = NULL;
847                 }
848
849         if (MYTID == NULL) {
850                 CtdlLogPrintf(CTDL_EMERG,
851                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
852                 abort();
853         } else {
854                 txcommit(MYTID);
855         }
856
857         MYTID = NULL;
858 }
859
860 /*
861  * Truncate (delete every record)
862  */
863 void cdb_trunc(int cdb)
864 {
865         /* DB_TXN *tid; */
866         int ret;
867         u_int32_t count;
868
869         if (MYTID != NULL) {
870                 CtdlLogPrintf(CTDL_EMERG,
871                         "cdb_trunc must not be called in a transaction.\n");
872                 abort();
873         } else {
874                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
875
876               retry:
877                 /* txbegin(&tid); */
878
879                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
880                                               NULL,     /* transaction ID */
881                                               &count,   /* #rows deleted */
882                                               0))) {    /* flags */
883                         if (ret == DB_LOCK_DEADLOCK) {
884                                 /* txabort(tid); */
885                                 goto retry;
886                         } else {
887                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
888                                 if (ret == ENOMEM) {
889                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
890                                 }
891                                 abort();
892                         }
893                 } else {
894                         /* txcommit(tid); */
895                 }
896         }
897 }