* want if mkdir, chmod, chown fail...
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  * Copyright (c) 1987-2009 by the citadel.org team
7  *
8  *  This program is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation; either version 3 of the License, or
11  *  (at your option) any later version.
12  *
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 /*****************************************************************************
25        Tunable configuration parameters for the Berkeley DB back end
26  *****************************************************************************/
27
28 /* Citadel will checkpoint the db at the end of every session, but only if
29  * the specified number of kilobytes has been written, or if the specified
30  * number of minutes has passed, since the last checkpoint.
31  */
32 #define MAX_CHECKPOINT_KBYTES   256
33 #define MAX_CHECKPOINT_MINUTES  15
34
35 /*****************************************************************************/
36
37 #include "sysdep.h"
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <ctype.h>
42 #include <string.h>
43 #include <errno.h>
44 #include <sys/types.h>
45 #include <sys/stat.h>
46 #include <dirent.h>
47
48 #ifdef HAVE_DB_H
49 #include <db.h>
50 #elif defined(HAVE_DB4_DB_H)
51 #include <db4/db.h>
52 #else
53 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
54 #endif
55
56
57 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
58 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
59 #endif
60
61
62 #include <libcitadel.h>
63 #include "citadel.h"
64 #include "server.h"
65 #include "citserver.h"
66 #include "database.h"
67 #include "msgbase.h"
68 #include "sysdep_decls.h"
69 #include "threads.h"
70 #include "config.h"
71 #include "control.h"
72
73 #include "ctdl_module.h"
74
75
76 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
77 static DB_ENV *dbenv;           /* The DB environment (global) */
78
79
80 #ifdef HAVE_ZLIB
81 #include <zlib.h>
82 #endif
83
84
85 /* Verbose logging callback */
86 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
87 {
88         if (!IsEmptyStr(msg)) {
89                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
90         }
91 }
92
93
94 /* Verbose logging callback */
95 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
96 {
97         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
98 }
99
100
101 /* just a little helper function */
102 static void txabort(DB_TXN * tid)
103 {
104         int ret;
105
106         ret = tid->abort(tid);
107
108         if (ret) {
109                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret));
110                 abort();
111         }
112 }
113
114 /* this one is even more helpful than the last. */
115 static void txcommit(DB_TXN * tid)
116 {
117         int ret;
118
119         ret = tid->commit(tid, 0);
120
121         if (ret) {
122                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret));
123                 abort();
124         }
125 }
126
127 /* are you sensing a pattern yet? */
128 static void txbegin(DB_TXN ** tid)
129 {
130         int ret;
131
132         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
133
134         if (ret) {
135                 CtdlLogPrintf(CTDL_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret));
136                 abort();
137         }
138 }
139
140 static void dbpanic(DB_ENV * env, int errval)
141 {
142         CtdlLogPrintf(CTDL_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval));
143 }
144
145 static void cclose(DBC * cursor)
146 {
147         int ret;
148
149         if ((ret = cursor->c_close(cursor))) {
150                 CtdlLogPrintf(CTDL_EMERG, "bdb(): c_close: %s\n", db_strerror(ret));
151                 abort();
152         }
153 }
154
155 static void bailIfCursor(DBC ** cursors, const char *msg)
156 {
157         int i;
158
159         for (i = 0; i < MAXCDB; i++)
160                 if (cursors[i] != NULL) {
161                         CtdlLogPrintf(CTDL_EMERG,
162                                 "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg);
163                         abort();
164                 }
165 }
166
167 void check_handles(void *arg)
168 {
169         if (arg != NULL) {
170                 ThreadTSD *tsd = (ThreadTSD *) arg;
171
172                 bailIfCursor(tsd->cursors, "in check_handles");
173
174                 if (tsd->tid != NULL) {
175                         CtdlLogPrintf(CTDL_EMERG, "bdb(): transaction still in progress!");
176                         abort();
177                 }
178         }
179 }
180
181 void cdb_check_handles(void)
182 {
183         check_handles(pthread_getspecific(ThreadKey));
184 }
185
186
187 /*
188  * Cull the database logs
189  */
190 static void cdb_cull_logs(void)
191 {
192         u_int32_t flags;
193         int ret;
194         char **file, **list;
195         char errmsg[SIZ];
196
197         flags = DB_ARCH_ABS;
198
199         /* Get the list of names. */
200         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
201                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
202                 return;
203         }
204
205         /* Print the list of names. */
206         if (list != NULL) {
207                 for (file = list; *file != NULL; ++file) {
208                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
209                         ret = unlink(*file);
210                         if (ret != 0) {
211                                 snprintf(errmsg, sizeof(errmsg),
212                                          " ** ERROR **\n \n \n "
213                                          "Citadel was unable to delete the "
214                                          "database log file '%s' because of the "
215                                          "following error:\n \n %s\n \n"
216                                          " This log file is no longer in use "
217                                          "and may be safely deleted.\n",
218                                          *file, strerror(errno));
219                                 CtdlAideMessage(errmsg, "Database Warning Message");
220                         }
221                 }
222                 free(list);
223         }
224 }
225
226 /*
227  * Manually initiate log file cull.
228  */
229 void cmd_cull(char *argbuf) {
230         if (CtdlAccessCheck(ac_internal)) return;
231         cdb_cull_logs();
232         cprintf("%d Database log file cull completed.\n", CIT_OK);
233 }
234
235
236 /*
237  * Request a checkpoint of the database.  Called once per minute by the thread manager.
238  */
239 void cdb_checkpoint(void)
240 {
241         int ret;
242
243         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
244         ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
245
246         if (ret != 0) {
247                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
248                 abort();
249         }
250
251         /* After a successful checkpoint, we can cull the unused logs */
252         if (config.c_auto_cull) {
253                 cdb_cull_logs();
254         }
255 }
256
257
258
259 /*
260  * Open the various databases we'll be using.  Any database which
261  * does not exist should be created.  Note that we don't need a
262  * critical section here, because there aren't any active threads
263  * manipulating the database yet.
264  */
265 void open_databases(void)
266 {
267         int ret;
268         int i;
269         char dbfilename[32];
270         u_int32_t flags = 0;
271         int dbversion_major, dbversion_minor, dbversion_patch;
272         int current_dbversion = 0;
273
274         CtdlLogPrintf(CTDL_DEBUG, "bdb(): open_databases() starting\n");
275         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
276         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
277                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
278
279         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
280
281         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
282         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
283
284         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
285            && (CitControl.MMdbversion > current_dbversion) ) {
286                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
287                                         "of Berkeley DB that is older than that which last created or\n"
288                                         "updated the database.  Because this would probably cause data\n"
289                                         "corruption or loss, the server is aborting execution now.\n");
290                 exit(CTDLEXIT_DB);
291         }
292
293         CitControl.MMdbversion = current_dbversion;
294         put_control();
295
296 #ifdef HAVE_ZLIB
297         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
298 #endif
299
300         /*
301          * Silently try to create the database subdirectory.  If it's
302          * already there, no problem.
303          */
304         if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
305                 CtdlLogPrintf(CTDL_EMERG, 
306                               "unable to create database directory [%s]: %s", 
307                               ctdl_data_dir, strerror(errno));
308         }
309         if (chmod(ctdl_data_dir, 0700) != 0){
310                 CtdlLogPrintf(CTDL_EMERG, 
311                               "unable to set database directory accessrights [%s]: %s", 
312                               ctdl_data_dir, strerror(errno));
313         }
314         if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
315                 CtdlLogPrintf(CTDL_EMERG, 
316                               "unable to set the owner for [%s]: %s", 
317                               ctdl_data_dir, strerror(errno));
318         }
319         CtdlLogPrintf(CTDL_DEBUG, "bdb(): Setting up DB environment\n");
320         db_env_set_func_yield(sched_yield);
321         ret = db_env_create(&dbenv, 0);
322         if (ret) {
323                 CtdlLogPrintf(CTDL_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
324                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
325                 exit(CTDLEXIT_DB);
326         }
327         dbenv->set_errpfx(dbenv, "citserver");
328         dbenv->set_paniccall(dbenv, dbpanic);
329         dbenv->set_errcall(dbenv, cdb_verbose_err);
330         dbenv->set_errpfx(dbenv, "ctdl");
331 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
332         dbenv->set_msgcall(dbenv, cdb_verbose_log);
333 #endif
334         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
335         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
336
337         /*
338          * We want to specify the shared memory buffer pool cachesize,
339          * but everything else is the default.
340          */
341         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
342         if (ret) {
343                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
344                 dbenv->close(dbenv, 0);
345                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
346                 exit(CTDLEXIT_DB);
347         }
348
349         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
350                 CtdlLogPrintf(CTDL_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
351                 dbenv->close(dbenv, 0);
352                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
353                 exit(CTDLEXIT_DB);
354         }
355
356         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
357         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
358         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
359         if (ret == DB_RUNRECOVERY) {
360                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
361                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
362                 flags |= DB_RECOVER;
363                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
364         }
365         if (ret == DB_RUNRECOVERY) {
366                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
367                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
368                 flags &= ~DB_RECOVER;
369                 flags |= DB_RECOVER_FATAL;
370                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
371         }
372         if (ret) {
373                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
374                 dbenv->close(dbenv, 0);
375                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
376                 exit(CTDLEXIT_DB);
377         }
378
379         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
380
381         for (i = 0; i < MAXCDB; ++i) {
382
383                 /* Create a database handle */
384                 ret = db_create(&dbp[i], dbenv, 0);
385                 if (ret) {
386                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
387                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
388                         exit(CTDLEXIT_DB);
389                 }
390
391
392                 /* Arbitrary names for our tables -- we reference them by
393                  * number, so we don't have string names for them.
394                  */
395                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
396
397                 ret = dbp[i]->open(dbp[i],
398                                    NULL,
399                                    dbfilename,
400                                    NULL,
401                                    DB_BTREE,
402                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
403                                    0600
404                 );
405                 if (ret) {
406                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
407                         if (ret == ENOMEM) {
408                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
409                         }
410                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
411                         exit(CTDLEXIT_DB);
412                 }
413         }
414
415 }
416
417
418 /* Make sure we own all the files, because in a few milliseconds
419  * we're going to drop root privs.
420  */
421 void cdb_chmod_data(void) {
422         DIR *dp;
423         struct dirent *d;
424         char filename[PATH_MAX];
425
426         dp = opendir(ctdl_data_dir);
427         if (dp != NULL) {
428                 while (d = readdir(dp), d != NULL) {
429                         if (d->d_name[0] != '.') {
430                                 snprintf(filename, sizeof filename,
431                                          "%s/%s", ctdl_data_dir, d->d_name);
432                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
433                                         filename, chmod(filename, 0600)
434                                 );
435                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
436                                         filename, chown(filename, CTDLUID, (-1))
437                                 );
438                         }
439                 }
440                 closedir(dp);
441         }
442
443         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
444         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
445 }
446
447
448 /*
449  * Close all of the db database files we've opened.  This can be done
450  * in a loop, since it's just a bunch of closes.
451  */
452 void close_databases(void)
453 {
454         int a;
455         int ret;
456
457         ctdl_thread_internal_free_tsd();
458         
459         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
460                 CtdlLogPrintf(CTDL_EMERG,
461                         "txn_checkpoint: %s\n", db_strerror(ret));
462         }
463
464         /* print some statistics... */
465 #ifdef DB_STAT_ALL
466         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
467 #endif
468
469         /* close the tables */
470         for (a = 0; a < MAXCDB; ++a) {
471                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
472                 ret = dbp[a]->close(dbp[a], 0);
473                 if (ret) {
474                         CtdlLogPrintf(CTDL_EMERG, "db_close: %s\n", db_strerror(ret));
475                 }
476
477         }
478
479         /* Close the handle. */
480         ret = dbenv->close(dbenv, 0);
481         if (ret) {
482                 CtdlLogPrintf(CTDL_EMERG, "DBENV->close: %s\n", db_strerror(ret));
483         }
484 }
485
486
487 /*
488  * Compression functions only used if we have zlib
489  */
490 void cdb_decompress_if_necessary(struct cdbdata *cdb)
491 {
492         static int magic = COMPRESS_MAGIC;
493
494         if (cdb == NULL)
495                 return;
496         if (cdb->ptr == NULL)
497                 return;
498         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
499                 return;
500
501 #ifdef HAVE_ZLIB
502         /* At this point we know we're looking at a compressed item. */
503
504         struct CtdlCompressHeader zheader;
505         char *uncompressed_data;
506         char *compressed_data;
507         uLongf destLen, sourceLen;
508
509         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
510
511         compressed_data = cdb->ptr;
512         compressed_data += sizeof(struct CtdlCompressHeader);
513
514         sourceLen = (uLongf) zheader.compressed_len;
515         destLen = (uLongf) zheader.uncompressed_len;
516         uncompressed_data = malloc(zheader.uncompressed_len);
517
518         if (uncompress((Bytef *) uncompressed_data,
519                        (uLongf *) & destLen,
520                        (const Bytef *) compressed_data,
521                        (uLong) sourceLen) != Z_OK) {
522                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
523                 abort();
524         }
525
526         free(cdb->ptr);
527         cdb->len = (size_t) destLen;
528         cdb->ptr = uncompressed_data;
529 #else                           /* HAVE_ZLIB */
530         CtdlLogPrintf(CTDL_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n");
531         abort();
532 #endif                          /* HAVE_ZLIB */
533 }
534
535
536
537 /*
538  * Store a piece of data.  Returns 0 if the operation was successful.  If a
539  * key already exists it should be overwritten.
540  */
541 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
542 {
543
544         DBT dkey, ddata;
545         DB_TXN *tid;
546         int ret = 0;
547
548 #ifdef HAVE_ZLIB
549         struct CtdlCompressHeader zheader;
550         char *compressed_data = NULL;
551         int compressing = 0;
552         size_t buffer_len = 0;
553         uLongf destLen = 0;
554 #endif
555
556         memset(&dkey, 0, sizeof(DBT));
557         memset(&ddata, 0, sizeof(DBT));
558         dkey.size = ckeylen;
559         dkey.data = ckey;
560         ddata.size = cdatalen;
561         ddata.data = cdata;
562
563 #ifdef HAVE_ZLIB
564         /* Only compress Visit records.  Everything else is uncompressed. */
565         if (cdb == CDB_VISIT) {
566                 compressing = 1;
567                 zheader.magic = COMPRESS_MAGIC;
568                 zheader.uncompressed_len = cdatalen;
569                 buffer_len = ((cdatalen * 101) / 100) + 100
570                     + sizeof(struct CtdlCompressHeader);
571                 destLen = (uLongf) buffer_len;
572                 compressed_data = malloc(buffer_len);
573                 if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
574                         &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
575                 {
576                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
577                         abort();
578                 }
579                 zheader.compressed_len = (size_t) destLen;
580                 memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
581                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
582                 ddata.data = compressed_data;
583         }
584 #endif
585
586         if (MYTID != NULL) {
587                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
588                                     MYTID,      /* transaction ID */
589                                     &dkey,      /* key */
590                                     &ddata,     /* data */
591                                     0); /* flags */
592                 if (ret) {
593                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret));
594                         abort();
595                 }
596 #ifdef HAVE_ZLIB
597                 if (compressing)
598                         free(compressed_data);
599 #endif
600                 return ret;
601
602         } else {
603                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
604
605               retry:
606                 txbegin(&tid);
607
608                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
609                                          tid,   /* transaction ID */
610                                          &dkey, /* key */
611                                          &ddata,        /* data */
612                                          0))) { /* flags */
613                         if (ret == DB_LOCK_DEADLOCK) {
614                                 txabort(tid);
615                                 goto retry;
616                         } else {
617                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
618                                         cdb, db_strerror(ret));
619                                 abort();
620                         }
621                 } else {
622                         txcommit(tid);
623 #ifdef HAVE_ZLIB
624                         if (compressing) {
625                                 free(compressed_data);
626                         }
627 #endif
628                         return ret;
629                 }
630         }
631 }
632
633
634 /*
635  * Delete a piece of data.  Returns 0 if the operation was successful.
636  */
637 int cdb_delete(int cdb, void *key, int keylen)
638 {
639
640         DBT dkey;
641         DB_TXN *tid;
642         int ret;
643
644         memset(&dkey, 0, sizeof dkey);
645         dkey.size = keylen;
646         dkey.data = key;
647
648         if (MYTID != NULL) {
649                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
650                 if (ret) {
651                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
652                         if (ret != DB_NOTFOUND) {
653                                 abort();
654                         }
655                 }
656         } else {
657                 bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
658
659               retry:
660                 txbegin(&tid);
661
662                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
663                     && ret != DB_NOTFOUND) {
664                         if (ret == DB_LOCK_DEADLOCK) {
665                                 txabort(tid);
666                                 goto retry;
667                         } else {
668                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
669                                         cdb, db_strerror(ret));
670                                 abort();
671                         }
672                 } else {
673                         txcommit(tid);
674                 }
675         }
676         return ret;
677 }
678
679 static DBC *localcursor(int cdb)
680 {
681         int ret;
682         DBC *curs;
683
684         if (MYCURSORS[cdb] == NULL)
685                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
686         else
687                 ret =
688                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
689                                           DB_POSITION);
690
691         if (ret) {
692                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
693                 abort();
694         }
695
696         return curs;
697 }
698
699
700 /*
701  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
702  * a struct cdbdata which it is the caller's responsibility to free later on
703  * using the cdb_free() routine.
704  */
705 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
706 {
707
708         struct cdbdata *tempcdb;
709         DBT dkey, dret;
710         int ret;
711
712         memset(&dkey, 0, sizeof(DBT));
713         dkey.size = keylen;
714         dkey.data = key;
715
716         if (MYTID != NULL) {
717                 memset(&dret, 0, sizeof(DBT));
718                 dret.flags = DB_DBT_MALLOC;
719                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
720         } else {
721                 DBC *curs;
722
723                 do {
724                         memset(&dret, 0, sizeof(DBT));
725                         dret.flags = DB_DBT_MALLOC;
726
727                         curs = localcursor(cdb);
728
729                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
730                         cclose(curs);
731                 }
732                 while (ret == DB_LOCK_DEADLOCK);
733
734         }
735
736         if ((ret != 0) && (ret != DB_NOTFOUND)) {
737                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
738                 abort();
739         }
740
741         if (ret != 0)
742                 return NULL;
743         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
744
745         if (tempcdb == NULL) {
746                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
747                 abort();
748         }
749
750         tempcdb->len = dret.size;
751         tempcdb->ptr = dret.data;
752         cdb_decompress_if_necessary(tempcdb);
753         return (tempcdb);
754 }
755
756
757 /*
758  * Free a cdbdata item.
759  *
760  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
761  * other code to assume ownership of that memory simply by storing the
762  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
763  * avoid freeing it.
764  */
765 void cdb_free(struct cdbdata *cdb)
766 {
767         if (cdb->ptr) {
768                 free(cdb->ptr);
769         }
770         free(cdb);
771 }
772
773 void cdb_close_cursor(int cdb)
774 {
775         if (MYCURSORS[cdb] != NULL) {
776                 cclose(MYCURSORS[cdb]);
777         }
778
779         MYCURSORS[cdb] = NULL;
780 }
781
782 /* 
783  * Prepare for a sequential search of an entire database.
784  * (There is guaranteed to be no more than one traversal in
785  * progress per thread at any given time.)
786  */
787 void cdb_rewind(int cdb)
788 {
789         int ret = 0;
790
791         if (MYCURSORS[cdb] != NULL) {
792                 CtdlLogPrintf(CTDL_EMERG,
793                         "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
794                 abort();
795                 /* cclose(MYCURSORS[cdb]); */
796         }
797
798         /*
799          * Now initialize the cursor
800          */
801         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
802         if (ret) {
803                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
804                 abort();
805         }
806 }
807
808
809 /*
810  * Fetch the next item in a sequential search.  Returns a pointer to a 
811  * cdbdata structure, or NULL if we've hit the end.
812  */
813 struct cdbdata *cdb_next_item(int cdb)
814 {
815         DBT key, data;
816         struct cdbdata *cdbret;
817         int ret = 0;
818
819         /* Initialize the key/data pair so the flags aren't set. */
820         memset(&key, 0, sizeof(key));
821         memset(&data, 0, sizeof(data));
822         data.flags = DB_DBT_MALLOC;
823
824         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
825
826         if (ret) {
827                 if (ret != DB_NOTFOUND) {
828                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
829                         abort();
830                 }
831                 cclose(MYCURSORS[cdb]);
832                 MYCURSORS[cdb] = NULL;
833                 return NULL;    /* presumably, end of file */
834         }
835
836         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
837         cdbret->len = data.size;
838         cdbret->ptr = data.data;
839         cdb_decompress_if_necessary(cdbret);
840
841         return (cdbret);
842 }
843
844
845
846 /*
847  * Transaction-based stuff.  I'm writing this as I bake cookies...
848  */
849
850 void cdb_begin_transaction(void)
851 {
852
853         bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
854
855         if (MYTID != NULL) {
856                 CtdlLogPrintf(CTDL_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
857                 abort();
858         }
859
860         txbegin(&MYTID);
861 }
862
863 void cdb_end_transaction(void)
864 {
865         int i;
866
867         for (i = 0; i < MAXCDB; i++)
868                 if (MYCURSORS[i] != NULL) {
869                         CtdlLogPrintf(CTDL_WARNING,
870                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
871                                 i);
872                         cclose(MYCURSORS[i]);
873                         MYCURSORS[i] = NULL;
874                 }
875
876         if (MYTID == NULL) {
877                 CtdlLogPrintf(CTDL_EMERG,
878                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
879                 abort();
880         } else {
881                 txcommit(MYTID);
882         }
883
884         MYTID = NULL;
885 }
886
887 /*
888  * Truncate (delete every record)
889  */
890 void cdb_trunc(int cdb)
891 {
892         /* DB_TXN *tid; */
893         int ret;
894         u_int32_t count;
895
896         if (MYTID != NULL) {
897                 CtdlLogPrintf(CTDL_EMERG,
898                         "cdb_trunc must not be called in a transaction.\n");
899                 abort();
900         } else {
901                 bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
902
903               retry:
904                 /* txbegin(&tid); */
905
906                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
907                                               NULL,     /* transaction ID */
908                                               &count,   /* #rows deleted */
909                                               0))) {    /* flags */
910                         if (ret == DB_LOCK_DEADLOCK) {
911                                 /* txabort(tid); */
912                                 goto retry;
913                         } else {
914                                 CtdlLogPrintf(CTDL_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
915                                 if (ret == ENOMEM) {
916                                         CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
917                                 }
918                                 abort();
919                         }
920                 } else {
921                         /* txcommit(tid); */
922                 }
923         }
924 }