* When we get a Berkeley DB panic, display a descriptive Berkeley DB error message...
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
94                         db_strerror(ret));
95                 abort();
96         }
97 }
98
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->commit(tid, 0);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n", db_strerror(ret));
108                 abort();
109         }
110 }
111
112 /* are you sensing a pattern yet? */
113 static void txbegin(DB_TXN ** tid)
114 {
115         int ret;
116
117         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
118
119         if (ret) {
120                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n", db_strerror(ret));
121                 abort();
122         }
123 }
124
125 static void dbpanic(DB_ENV * env, int errval)
126 {
127         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %s\n", db_strerror(errval));
128 }
129
130 static void cclose(DBC * cursor)
131 {
132         int ret;
133
134         if ((ret = cursor->c_close(cursor))) {
135                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n", db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void bailIfCursor(DBC ** cursors, const char *msg)
141 {
142         int i;
143
144         for (i = 0; i < MAXCDB; i++)
145                 if (cursors[i] != NULL) {
146                         CtdlLogPrintf(CTDL_EMERG,
147                                 "cdb_*: cursor still in progress on cdb %02x: %s\n", i, msg);
148                         abort();
149                 }
150 }
151
152 void check_handles(void *arg)
153 {
154         if (arg != NULL) {
155                 ThreadTSD *tsd = (ThreadTSD *) arg;
156
157                 bailIfCursor(tsd->cursors, "in check_handles");
158
159                 if (tsd->tid != NULL) {
160                         CtdlLogPrintf(CTDL_EMERG,
161                                 "cdb_*: transaction still in progress!");
162                         abort();
163                 }
164         }
165 }
166
167 void cdb_check_handles(void)
168 {
169         check_handles(pthread_getspecific(ThreadKey));
170 }
171
172
173 /*
174  * Cull the database logs
175  */
176 static void cdb_cull_logs(void)
177 {
178         u_int32_t flags;
179         int ret;
180         char **file, **list;
181         char errmsg[SIZ];
182
183         flags = DB_ARCH_ABS;
184
185         /* Get the list of names. */
186         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
187                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
188                 return;
189         }
190
191         /* Print the list of names. */
192         if (list != NULL) {
193                 for (file = list; *file != NULL; ++file) {
194                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
195                         ret = unlink(*file);
196                         if (ret != 0) {
197                                 snprintf(errmsg, sizeof(errmsg),
198                                          " ** ERROR **\n \n \n "
199                                          "Citadel was unable to delete the "
200                                          "database log file '%s' because of the "
201                                          "following error:\n \n %s\n \n"
202                                          " This log file is no longer in use "
203                                          "and may be safely deleted.\n",
204                                          *file, strerror(errno));
205                                 aide_message(errmsg, "Database Warning Message");
206                         }
207                 }
208                 free(list);
209         }
210 }
211
212 /*
213  * Manually initiate log file cull.
214  */
215 void cmd_cull(char *argbuf) {
216         if (CtdlAccessCheck(ac_internal)) return;
217         cdb_cull_logs();
218         cprintf("%d Database log file cull completed.\n", CIT_OK);
219 }
220
221
222 /*
223  * Request a checkpoint of the database.  Called once per minute by the thread manager.
224  */
225 void cdb_checkpoint(void)
226 {
227         int ret;
228
229         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
230         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
231
232         if (ret != 0) {
233                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
234                 abort();
235         }
236
237         /* After a successful checkpoint, we can cull the unused logs */
238         if (config.c_auto_cull) {
239                 cdb_cull_logs();
240         }
241 }
242
243
244
245 /*
246  * Open the various databases we'll be using.  Any database which
247  * does not exist should be created.  Note that we don't need a
248  * critical section here, because there aren't any active threads
249  * manipulating the database yet.
250  */
251 void open_databases(void)
252 {
253         int ret;
254         int i;
255         char dbfilename[32];
256         u_int32_t flags = 0;
257         int dbversion_major, dbversion_minor, dbversion_patch;
258         int current_dbversion = 0;
259
260         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
261         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
262         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
263                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
264
265         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
266
267         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
268         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
269
270         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
271            && (CitControl.MMdbversion > current_dbversion) ) {
272                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
273                                         "of Berkeley DB that is older than that which last created or\n"
274                                         "updated the database.  Because this would probably cause data\n"
275                                         "corruption or loss, the server is aborting execution now.\n");
276                 exit(CTDLEXIT_DB);
277         }
278
279         CitControl.MMdbversion = current_dbversion;
280         put_control();
281
282 #ifdef HAVE_ZLIB
283         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
284 #endif
285
286         /*
287          * Silently try to create the database subdirectory.  If it's
288          * already there, no problem.
289          */
290         mkdir(ctdl_data_dir, 0700);
291         chmod(ctdl_data_dir, 0700);
292         chown(ctdl_data_dir, CTDLUID, (-1));
293
294         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
295         db_env_set_func_yield(sched_yield);
296         ret = db_env_create(&dbenv, 0);
297         if (ret) {
298                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n", db_strerror(ret));
299                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
300                 exit(CTDLEXIT_DB);
301         }
302         dbenv->set_errpfx(dbenv, "citserver");
303         dbenv->set_paniccall(dbenv, dbpanic);
304         dbenv->set_errcall(dbenv, cdb_verbose_err);
305         dbenv->set_errpfx(dbenv, "ctdl");
306 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
307         dbenv->set_msgcall(dbenv, cdb_verbose_log);
308 #endif
309         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
310         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
311
312         /*
313          * We want to specify the shared memory buffer pool cachesize,
314          * but everything else is the default.
315          */
316         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
317         if (ret) {
318                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
319                 dbenv->close(dbenv, 0);
320                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
321                 exit(CTDLEXIT_DB);
322         }
323
324         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
325                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
326                 dbenv->close(dbenv, 0);
327                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
328                 exit(CTDLEXIT_DB);
329         }
330
331         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
332         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
333         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
334         if (ret == DB_RUNRECOVERY) {
335                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
336                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
337                 flags |= DB_RECOVER;
338                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
339         }
340         if (ret == DB_RUNRECOVERY) {
341                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
342                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
343                 flags &= ~DB_RECOVER;
344                 flags |= DB_RECOVER_FATAL;
345                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
346         }
347         if (ret) {
348                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
349                 dbenv->close(dbenv, 0);
350                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
351                 exit(CTDLEXIT_DB);
352         }
353
354         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
355
356         for (i = 0; i < MAXCDB; ++i) {
357
358                 /* Create a database handle */
359                 ret = db_create(&dbp[i], dbenv, 0);
360                 if (ret) {
361                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
362                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
363                         exit(CTDLEXIT_DB);
364                 }
365
366
367                 /* Arbitrary names for our tables -- we reference them by
368                  * number, so we don't have string names for them.
369                  */
370                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
371
372                 ret = dbp[i]->open(dbp[i],
373                                    NULL,
374                                    dbfilename,
375                                    NULL,
376                                    DB_BTREE,
377                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
378                                    0600);
379                 if (ret) {
380                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
381                         if (ret == ENOMEM) {
382                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
383                         }
384                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
385                         exit(CTDLEXIT_DB);
386                 }
387         }
388
389 }
390
391
392 /* Make sure we own all the files, because in a few milliseconds
393  * we're going to drop root privs.
394  */
395 void cdb_chmod_data(void) {
396         DIR *dp;
397         struct dirent *d;
398         char filename[PATH_MAX];
399
400         dp = opendir(ctdl_data_dir);
401         if (dp != NULL) {
402                 while (d = readdir(dp), d != NULL) {
403                         if (d->d_name[0] != '.') {
404                                 snprintf(filename, sizeof filename,
405                                          "%s/%s", ctdl_data_dir, d->d_name);
406                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
407                                         filename, chmod(filename, 0600)
408                                 );
409                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
410                                         filename, chown(filename, CTDLUID, (-1))
411                                 );
412                         }
413                 }
414                 closedir(dp);
415         }
416
417         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
418
419         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
420 }
421
422
423 /*
424  * Close all of the db database files we've opened.  This can be done
425  * in a loop, since it's just a bunch of closes.
426  */
427 void close_databases(void)
428 {
429         int a;
430         int ret;
431
432         ctdl_thread_internal_free_tsd();
433         
434         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
435                 CtdlLogPrintf(CTDL_EMERG,
436                         "txn_checkpoint: %s\n", db_strerror(ret));
437         }
438
439         /* print some statistics... */
440 #ifdef DB_STAT_ALL
441         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
442 #endif
443
444         /* close the tables */
445         for (a = 0; a < MAXCDB; ++a) {
446                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
447                 ret = dbp[a]->close(dbp[a], 0);
448                 if (ret) {
449                         CtdlLogPrintf(CTDL_EMERG,
450                                 "db_close: %s\n", db_strerror(ret));
451                 }
452
453         }
454
455         /* Close the handle. */
456         ret = dbenv->close(dbenv, 0);
457         if (ret) {
458                 CtdlLogPrintf(CTDL_EMERG,
459                         "DBENV->close: %s\n", db_strerror(ret));
460         }
461 }
462
463
464 /*
465  * Compression functions only used if we have zlib
466  */
467 void cdb_decompress_if_necessary(struct cdbdata *cdb)
468 {
469         static int magic = COMPRESS_MAGIC;
470
471         if (cdb == NULL)
472                 return;
473         if (cdb->ptr == NULL)
474                 return;
475         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
476                 return;
477
478 #ifdef HAVE_ZLIB
479         /* At this point we know we're looking at a compressed item. */
480
481         struct CtdlCompressHeader zheader;
482         char *uncompressed_data;
483         char *compressed_data;
484         uLongf destLen, sourceLen;
485
486         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
487
488         compressed_data = cdb->ptr;
489         compressed_data += sizeof(struct CtdlCompressHeader);
490
491         sourceLen = (uLongf) zheader.compressed_len;
492         destLen = (uLongf) zheader.uncompressed_len;
493         uncompressed_data = malloc(zheader.uncompressed_len);
494
495         if (uncompress((Bytef *) uncompressed_data,
496                        (uLongf *) & destLen,
497                        (const Bytef *) compressed_data,
498                        (uLong) sourceLen) != Z_OK) {
499                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
500                 abort();
501         }
502
503         free(cdb->ptr);
504         cdb->len = (size_t) destLen;
505         cdb->ptr = uncompressed_data;
506 #else                           /* HAVE_ZLIB */
507         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
508         abort();
509 #endif                          /* HAVE_ZLIB */
510 }
511
512
513
514 /*
515  * Store a piece of data.  Returns 0 if the operation was successful.  If a
516  * key already exists it should be overwritten.
517  */
518 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
519 {
520
521         DBT dkey, ddata;
522         DB_TXN *tid;
523         int ret = 0;
524
525 #ifdef HAVE_ZLIB
526         struct CtdlCompressHeader zheader;
527         char *compressed_data = NULL;
528         int compressing = 0;
529         size_t buffer_len = 0;
530         uLongf destLen = 0;
531 #endif
532
533         memset(&dkey, 0, sizeof(DBT));
534         memset(&ddata, 0, sizeof(DBT));
535         dkey.size = ckeylen;
536         dkey.data = ckey;
537         ddata.size = cdatalen;
538         ddata.data = cdata;
539
540 #ifdef HAVE_ZLIB
541         /* Only compress Visit records.  Everything else is uncompressed. */
542         if (cdb == CDB_VISIT) {
543                 compressing = 1;
544                 zheader.magic = COMPRESS_MAGIC;
545                 zheader.uncompressed_len = cdatalen;
546                 buffer_len = ((cdatalen * 101) / 100) + 100
547                     + sizeof(struct CtdlCompressHeader);
548                 destLen = (uLongf) buffer_len;
549                 compressed_data = malloc(buffer_len);
550                 if (compress2((Bytef *) (compressed_data +
551                                          sizeof(struct
552                                                 CtdlCompressHeader)),
553                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
554                               1) != Z_OK) {
555                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
556                         abort();
557                 }
558                 zheader.compressed_len = (size_t) destLen;
559                 memcpy(compressed_data, &zheader,
560                        sizeof(struct CtdlCompressHeader));
561                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
562                                        zheader.compressed_len);
563                 ddata.data = compressed_data;
564         }
565 #endif
566
567         if (MYTID != NULL) {
568                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
569                                     MYTID,      /* transaction ID */
570                                     &dkey,      /* key */
571                                     &ddata,     /* data */
572                                     0); /* flags */
573                 if (ret) {
574                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
575                                 db_strerror(ret));
576                         abort();
577                 }
578 #ifdef HAVE_ZLIB
579                 if (compressing)
580                         free(compressed_data);
581 #endif
582                 return ret;
583
584         } else {
585                 bailIfCursor(MYCURSORS,
586                              "attempt to write during r/o cursor");
587
588               retry:
589                 txbegin(&tid);
590
591                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
592                                          tid,   /* transaction ID */
593                                          &dkey, /* key */
594                                          &ddata,        /* data */
595                                          0))) { /* flags */
596                         if (ret == DB_LOCK_DEADLOCK) {
597                                 txabort(tid);
598                                 goto retry;
599                         } else {
600                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
601                                         cdb, db_strerror(ret));
602                                 abort();
603                         }
604                 } else {
605                         txcommit(tid);
606 #ifdef HAVE_ZLIB
607                         if (compressing)
608                                 free(compressed_data);
609 #endif
610                         return ret;
611                 }
612         }
613 }
614
615
616 /*
617  * Delete a piece of data.  Returns 0 if the operation was successful.
618  */
619 int cdb_delete(int cdb, void *key, int keylen)
620 {
621
622         DBT dkey;
623         DB_TXN *tid;
624         int ret;
625
626         memset(&dkey, 0, sizeof dkey);
627         dkey.size = keylen;
628         dkey.data = key;
629
630         if (MYTID != NULL) {
631                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
632                 if (ret) {
633                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
634                                 db_strerror(ret));
635                         if (ret != DB_NOTFOUND)
636                                 abort();
637                 }
638         } else {
639                 bailIfCursor(MYCURSORS,
640                              "attempt to delete during r/o cursor");
641
642               retry:
643                 txbegin(&tid);
644
645                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
646                     && ret != DB_NOTFOUND) {
647                         if (ret == DB_LOCK_DEADLOCK) {
648                                 txabort(tid);
649                                 goto retry;
650                         } else {
651                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
652                                         cdb, db_strerror(ret));
653                                 abort();
654                         }
655                 } else {
656                         txcommit(tid);
657                 }
658         }
659         return ret;
660 }
661
662 static DBC *localcursor(int cdb)
663 {
664         int ret;
665         DBC *curs;
666
667         if (MYCURSORS[cdb] == NULL)
668                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
669         else
670                 ret =
671                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
672                                           DB_POSITION);
673
674         if (ret) {
675                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
676                 abort();
677         }
678
679         return curs;
680 }
681
682
683 /*
684  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
685  * a struct cdbdata which it is the caller's responsibility to free later on
686  * using the cdb_free() routine.
687  */
688 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
689 {
690
691         struct cdbdata *tempcdb;
692         DBT dkey, dret;
693         int ret;
694
695         memset(&dkey, 0, sizeof(DBT));
696         dkey.size = keylen;
697         dkey.data = key;
698
699         if (MYTID != NULL) {
700                 memset(&dret, 0, sizeof(DBT));
701                 dret.flags = DB_DBT_MALLOC;
702                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
703         } else {
704                 DBC *curs;
705
706                 do {
707                         memset(&dret, 0, sizeof(DBT));
708                         dret.flags = DB_DBT_MALLOC;
709
710                         curs = localcursor(cdb);
711
712                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
713                         cclose(curs);
714                 }
715                 while (ret == DB_LOCK_DEADLOCK);
716
717         }
718
719         if ((ret != 0) && (ret != DB_NOTFOUND)) {
720                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
721                         db_strerror(ret));
722                 abort();
723         }
724
725         if (ret != 0)
726                 return NULL;
727         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
728
729         if (tempcdb == NULL) {
730                 CtdlLogPrintf(CTDL_EMERG,
731                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
732                 abort();
733         }
734
735         tempcdb->len = dret.size;
736         tempcdb->ptr = dret.data;
737         cdb_decompress_if_necessary(tempcdb);
738         return (tempcdb);
739 }
740
741
742 /*
743  * Free a cdbdata item.
744  *
745  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
746  * other code to assume ownership of that memory simply by storing the
747  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
748  * avoid freeing it.
749  */
750 void cdb_free(struct cdbdata *cdb)
751 {
752         if (cdb->ptr) {
753                 free(cdb->ptr);
754         }
755         free(cdb);
756 }
757
758 void cdb_close_cursor(int cdb)
759 {
760         if (MYCURSORS[cdb] != NULL)
761                 cclose(MYCURSORS[cdb]);
762
763         MYCURSORS[cdb] = NULL;
764 }
765
766 /* 
767  * Prepare for a sequential search of an entire database.
768  * (There is guaranteed to be no more than one traversal in
769  * progress per thread at any given time.)
770  */
771 void cdb_rewind(int cdb)
772 {
773         int ret = 0;
774
775         if (MYCURSORS[cdb] != NULL) {
776                 CtdlLogPrintf(CTDL_EMERG,
777                         "cdb_rewind: must close cursor on database %d before reopening.\n",
778                         cdb);
779                 abort();
780                 /* cclose(MYCURSORS[cdb]); */
781         }
782
783         /*
784          * Now initialize the cursor
785          */
786         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
787         if (ret) {
788                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
789                         db_strerror(ret));
790                 abort();
791         }
792 }
793
794
795 /*
796  * Fetch the next item in a sequential search.  Returns a pointer to a 
797  * cdbdata structure, or NULL if we've hit the end.
798  */
799 struct cdbdata *cdb_next_item(int cdb)
800 {
801         DBT key, data;
802         struct cdbdata *cdbret;
803         int ret = 0;
804
805         /* Initialize the key/data pair so the flags aren't set. */
806         memset(&key, 0, sizeof(key));
807         memset(&data, 0, sizeof(data));
808         data.flags = DB_DBT_MALLOC;
809
810         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
811
812         if (ret) {
813                 if (ret != DB_NOTFOUND) {
814                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
815                                 cdb, db_strerror(ret));
816                         abort();
817                 }
818                 cclose(MYCURSORS[cdb]);
819                 MYCURSORS[cdb] = NULL;
820                 return NULL;    /* presumably, end of file */
821         }
822
823         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
824         cdbret->len = data.size;
825         cdbret->ptr = data.data;
826         cdb_decompress_if_necessary(cdbret);
827
828         return (cdbret);
829 }
830
831
832
833 /*
834  * Transaction-based stuff.  I'm writing this as I bake cookies...
835  */
836
837 void cdb_begin_transaction(void)
838 {
839
840         bailIfCursor(MYCURSORS,
841                      "can't begin transaction during r/o cursor");
842
843         if (MYTID != NULL) {
844                 CtdlLogPrintf(CTDL_EMERG,
845                         "cdb_begin_transaction: ERROR: nested transaction\n");
846                 abort();
847         }
848
849         txbegin(&MYTID);
850 }
851
852 void cdb_end_transaction(void)
853 {
854         int i;
855
856         for (i = 0; i < MAXCDB; i++)
857                 if (MYCURSORS[i] != NULL) {
858                         CtdlLogPrintf(CTDL_WARNING,
859                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
860                                 i);
861                         cclose(MYCURSORS[i]);
862                         MYCURSORS[i] = NULL;
863                 }
864
865         if (MYTID == NULL) {
866                 CtdlLogPrintf(CTDL_EMERG,
867                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
868                 abort();
869         } else
870                 txcommit(MYTID);
871
872         MYTID = NULL;
873 }
874
875 /*
876  * Truncate (delete every record)
877  */
878 void cdb_trunc(int cdb)
879 {
880         /* DB_TXN *tid; */
881         int ret;
882         u_int32_t count;
883
884         if (MYTID != NULL) {
885                 CtdlLogPrintf(CTDL_EMERG,
886                         "cdb_trunc must not be called in a transaction.\n");
887                 abort();
888         } else {
889                 bailIfCursor(MYCURSORS,
890                              "attempt to write during r/o cursor");
891
892               retry:
893                 /* txbegin(&tid); */
894
895                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
896                                               NULL,     /* transaction ID */
897                                               &count,   /* #rows deleted */
898                                               0))) {    /* flags */
899                         if (ret == DB_LOCK_DEADLOCK) {
900                                 /* txabort(tid); */
901                                 goto retry;
902                         } else {
903                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
904                                 if (ret == ENOMEM) {
905                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
906                                 }
907                                 abort();
908                         }
909                 } else {
910                         /* txcommit(tid); */
911                 }
912         }
913 }