* When db->open() fails with ENOMEM, display a log message suggesting that the system...
[citadel.git] / citadel / database.c
1 /*
2  * $Id$
3  *
4  * This is a data store backend for the Citadel server which uses Berkeley DB.
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Berkeley DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31
32 #ifdef HAVE_DB_H
33 #include <db.h>
34 #elif defined(HAVE_DB4_DB_H)
35 #include <db4/db.h>
36 #else
37 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
38 #endif
39
40
41 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
42 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
43 #endif
44
45
46 #include <libcitadel.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "citserver.h"
50 #include "database.h"
51 #include "msgbase.h"
52 #include "sysdep_decls.h"
53 #include "threads.h"
54 #include "config.h"
55 #include "control.h"
56
57 #include "ctdl_module.h"
58
59
60 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
61 static DB_ENV *dbenv;           /* The DB environment (global) */
62
63
64 #ifdef HAVE_ZLIB
65 #include <zlib.h>
66 #endif
67
68
69 /* Verbose logging callback */
70 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
71 {
72         if (!IsEmptyStr(msg)) {
73                 CtdlLogPrintf(CTDL_DEBUG, "DB: %s\n", msg);
74         }
75 }
76
77
78 /* Verbose logging callback */
79 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
80 {
81         CtdlLogPrintf(CTDL_ALERT, "DB: %s\n", msg);
82 }
83
84
85 /* just a little helper function */
86 static void txabort(DB_TXN * tid)
87 {
88         int ret;
89
90         ret = tid->abort(tid);
91
92         if (ret) {
93                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_abort: %s\n",
94                         db_strerror(ret));
95                 abort();
96         }
97 }
98
99 /* this one is even more helpful than the last. */
100 static void txcommit(DB_TXN * tid)
101 {
102         int ret;
103
104         ret = tid->commit(tid, 0);
105
106         if (ret) {
107                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_commit: %s\n",
108                         db_strerror(ret));
109                 abort();
110         }
111 }
112
113 /* are you sensing a pattern yet? */
114 static void txbegin(DB_TXN ** tid)
115 {
116         int ret;
117
118         ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
119
120         if (ret) {
121                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: txn_begin: %s\n",
122                         db_strerror(ret));
123                 abort();
124         }
125 }
126
127 static void dbpanic(DB_ENV * env, int errval)
128 {
129         CtdlLogPrintf(CTDL_EMERG, "cdb_*: Berkeley DB panic: %d\n", errval);
130 }
131
132 static void cclose(DBC * cursor)
133 {
134         int ret;
135
136         if ((ret = cursor->c_close(cursor))) {
137                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: c_close: %s\n",
138                         db_strerror(ret));
139                 abort();
140         }
141 }
142
143 static void bailIfCursor(DBC ** cursors, const char *msg)
144 {
145         int i;
146
147         for (i = 0; i < MAXCDB; i++)
148                 if (cursors[i] != NULL) {
149                         CtdlLogPrintf(CTDL_EMERG,
150                                 "cdb_*: cursor still in progress on cdb %02x: %s\n",
151                                 i, msg);
152                         abort();
153                 }
154 }
155
156 void check_handles(void *arg)
157 {
158         if (arg != NULL) {
159                 ThreadTSD *tsd = (ThreadTSD *) arg;
160
161                 bailIfCursor(tsd->cursors, "in check_handles");
162
163                 if (tsd->tid != NULL) {
164                         CtdlLogPrintf(CTDL_EMERG,
165                                 "cdb_*: transaction still in progress!");
166                         abort();
167                 }
168         }
169 }
170
171 void cdb_check_handles(void)
172 {
173         check_handles(pthread_getspecific(ThreadKey));
174 }
175
176
177 /*
178  * Cull the database logs
179  */
180 static void cdb_cull_logs(void)
181 {
182         u_int32_t flags;
183         int ret;
184         char **file, **list;
185         char errmsg[SIZ];
186
187         flags = DB_ARCH_ABS;
188
189         /* Get the list of names. */
190         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
191                 CtdlLogPrintf(CTDL_ERR, "cdb_cull_logs: %s\n", db_strerror(ret));
192                 return;
193         }
194
195         /* Print the list of names. */
196         if (list != NULL) {
197                 for (file = list; *file != NULL; ++file) {
198                         CtdlLogPrintf(CTDL_DEBUG, "Deleting log: %s\n", *file);
199                         ret = unlink(*file);
200                         if (ret != 0) {
201                                 snprintf(errmsg, sizeof(errmsg),
202                                          " ** ERROR **\n \n \n "
203                                          "Citadel was unable to delete the "
204                                          "database log file '%s' because of the "
205                                          "following error:\n \n %s\n \n"
206                                          " This log file is no longer in use "
207                                          "and may be safely deleted.\n",
208                                          *file, strerror(errno));
209                                 aide_message(errmsg, "Database Warning Message");
210                         }
211                 }
212                 free(list);
213         }
214 }
215
216 /*
217  * Manually initiate log file cull.
218  */
219 void cmd_cull(char *argbuf) {
220         if (CtdlAccessCheck(ac_internal)) return;
221         cdb_cull_logs();
222         cprintf("%d Database log file cull completed.\n", CIT_OK);
223 }
224
225
226 /*
227  * Request a checkpoint of the database.  Called once per minute by the thread manager.
228  */
229 void cdb_checkpoint(void)
230 {
231         int ret;
232
233         CtdlLogPrintf(CTDL_DEBUG, "-- db checkpoint --\n");
234         ret = dbenv->txn_checkpoint(dbenv,
235                                     MAX_CHECKPOINT_KBYTES,
236                                     MAX_CHECKPOINT_MINUTES, 0);
237
238         if (ret != 0) {
239                 CtdlLogPrintf(CTDL_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n",
240                         db_strerror(ret));
241                 abort();
242         }
243
244         /* After a successful checkpoint, we can cull the unused logs */
245         if (config.c_auto_cull) {
246                 cdb_cull_logs();
247         }
248 }
249
250
251
252 /*
253  * Open the various databases we'll be using.  Any database which
254  * does not exist should be created.  Note that we don't need a
255  * critical section here, because there aren't any active threads
256  * manipulating the database yet.
257  */
258 void open_databases(void)
259 {
260         int ret;
261         int i;
262         char dbfilename[32];
263         u_int32_t flags = 0;
264         int dbversion_major, dbversion_minor, dbversion_patch;
265         int current_dbversion = 0;
266
267         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: open_databases() starting\n");
268         CtdlLogPrintf(CTDL_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING);
269         CtdlLogPrintf(CTDL_INFO, "  Linked db: %s\n",
270                 db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
271
272         current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
273
274         CtdlLogPrintf(CTDL_DEBUG, "Calculated dbversion: %d\n", current_dbversion);
275         CtdlLogPrintf(CTDL_DEBUG, "  Previous dbversion: %d\n", CitControl.MMdbversion);
276
277         if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
278            && (CitControl.MMdbversion > current_dbversion) ) {
279                 CtdlLogPrintf(CTDL_EMERG, "You are attempting to run the Citadel server using a version\n"
280                                         "of Berkeley DB that is older than that which last created or\n"
281                                         "updated the database.  Because this would probably cause data\n"
282                                         "corruption or loss, the server is aborting execution now.\n");
283                 exit(CTDLEXIT_DB);
284         }
285
286         CitControl.MMdbversion = current_dbversion;
287         put_control();
288
289 #ifdef HAVE_ZLIB
290         CtdlLogPrintf(CTDL_INFO, "Linked zlib: %s\n", zlibVersion());
291 #endif
292
293         /*
294          * Silently try to create the database subdirectory.  If it's
295          * already there, no problem.
296          */
297         mkdir(ctdl_data_dir, 0700);
298         chmod(ctdl_data_dir, 0700);
299         chown(ctdl_data_dir, CTDLUID, (-1));
300
301         CtdlLogPrintf(CTDL_DEBUG, "cdb_*: Setting up DB environment\n");
302         db_env_set_func_yield(sched_yield);
303         ret = db_env_create(&dbenv, 0);
304         if (ret) {
305                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: db_env_create: %s\n", db_strerror(ret));
306                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
307                 exit(CTDLEXIT_DB);
308         }
309         dbenv->set_errpfx(dbenv, "citserver");
310         dbenv->set_paniccall(dbenv, dbpanic);
311         dbenv->set_errcall(dbenv, cdb_verbose_err);
312         dbenv->set_errpfx(dbenv, "ctdl");
313 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
314         dbenv->set_msgcall(dbenv, cdb_verbose_log);
315 #endif
316         dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
317         dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
318
319         /*
320          * We want to specify the shared memory buffer pool cachesize,
321          * but everything else is the default.
322          */
323         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
324         if (ret) {
325                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
326                 dbenv->close(dbenv, 0);
327                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
328                 exit(CTDLEXIT_DB);
329         }
330
331         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
332                 CtdlLogPrintf(CTDL_EMERG, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
333                 dbenv->close(dbenv, 0);
334                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
335                 exit(CTDLEXIT_DB);
336         }
337
338         flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
339         CtdlLogPrintf(CTDL_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
340         ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
341         if (ret == DB_RUNRECOVERY) {
342                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
343                 CtdlLogPrintf(CTDL_ALERT, "Attempting recovery...\n");
344                 flags |= DB_RECOVER;
345                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
346         }
347         if (ret == DB_RUNRECOVERY) {
348                 CtdlLogPrintf(CTDL_ALERT, "dbenv->open: %s\n", db_strerror(ret));
349                 CtdlLogPrintf(CTDL_ALERT, "Attempting catastrophic recovery...\n");
350                 flags &= ~DB_RECOVER;
351                 flags |= DB_RECOVER_FATAL;
352                 ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
353         }
354         if (ret) {
355                 CtdlLogPrintf(CTDL_EMERG, "dbenv->open: %s\n", db_strerror(ret));
356                 dbenv->close(dbenv, 0);
357                 CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
358                 exit(CTDLEXIT_DB);
359         }
360
361         CtdlLogPrintf(CTDL_INFO, "Starting up DB\n");
362
363         for (i = 0; i < MAXCDB; ++i) {
364
365                 /* Create a database handle */
366                 ret = db_create(&dbp[i], dbenv, 0);
367                 if (ret) {
368                         CtdlLogPrintf(CTDL_EMERG, "db_create: %s\n", db_strerror(ret));
369                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
370                         exit(CTDLEXIT_DB);
371                 }
372
373
374                 /* Arbitrary names for our tables -- we reference them by
375                  * number, so we don't have string names for them.
376                  */
377                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
378
379                 ret = dbp[i]->open(dbp[i],
380                                    NULL,
381                                    dbfilename,
382                                    NULL,
383                                    DB_BTREE,
384                                    DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
385                                    0600);
386                 if (ret) {
387                         CtdlLogPrintf(CTDL_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
388                         if (ret == ENOMEM) {
389                                 CtdlLogPrintf(CTDL_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n");
390                         }
391                         CtdlLogPrintf(CTDL_EMERG, "exit code %d\n", ret);
392                         exit(CTDLEXIT_DB);
393                 }
394         }
395
396 }
397
398
399 /* Make sure we own all the files, because in a few milliseconds
400  * we're going to drop root privs.
401  */
402 void cdb_chmod_data(void) {
403         DIR *dp;
404         struct dirent *d;
405         char filename[PATH_MAX];
406
407         dp = opendir(ctdl_data_dir);
408         if (dp != NULL) {
409                 while (d = readdir(dp), d != NULL) {
410                         if (d->d_name[0] != '.') {
411                                 snprintf(filename, sizeof filename,
412                                          "%s/%s", ctdl_data_dir, d->d_name);
413                                 CtdlLogPrintf(9, "chmod(%s, 0600) returned %d\n",
414                                         filename, chmod(filename, 0600)
415                                 );
416                                 CtdlLogPrintf(9, "chown(%s, CTDLUID, -1) returned %d\n",
417                                         filename, chown(filename, CTDLUID, (-1))
418                                 );
419                         }
420                 }
421                 closedir(dp);
422         }
423
424         CtdlLogPrintf(CTDL_DEBUG, "open_databases() finished\n");
425
426         CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
427 }
428
429
430 /*
431  * Close all of the db database files we've opened.  This can be done
432  * in a loop, since it's just a bunch of closes.
433  */
434 void close_databases(void)
435 {
436         int a;
437         int ret;
438
439         ctdl_thread_internal_free_tsd();
440         
441         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
442                 CtdlLogPrintf(CTDL_EMERG,
443                         "txn_checkpoint: %s\n", db_strerror(ret));
444         }
445
446         /* print some statistics... */
447 #ifdef DB_STAT_ALL
448         dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
449 #endif
450
451         /* close the tables */
452         for (a = 0; a < MAXCDB; ++a) {
453                 CtdlLogPrintf(CTDL_INFO, "Closing database %02x\n", a);
454                 ret = dbp[a]->close(dbp[a], 0);
455                 if (ret) {
456                         CtdlLogPrintf(CTDL_EMERG,
457                                 "db_close: %s\n", db_strerror(ret));
458                 }
459
460         }
461
462         /* Close the handle. */
463         ret = dbenv->close(dbenv, 0);
464         if (ret) {
465                 CtdlLogPrintf(CTDL_EMERG,
466                         "DBENV->close: %s\n", db_strerror(ret));
467         }
468 }
469
470
471 /*
472  * Compression functions only used if we have zlib
473  */
474 #ifdef HAVE_ZLIB
475
476 void cdb_decompress_if_necessary(struct cdbdata *cdb)
477 {
478         static int magic = COMPRESS_MAGIC;
479         struct CtdlCompressHeader zheader;
480         char *uncompressed_data;
481         char *compressed_data;
482         uLongf destLen, sourceLen;
483
484         if (cdb == NULL)
485                 return;
486         if (cdb->ptr == NULL)
487                 return;
488         if (memcmp(cdb->ptr, &magic, sizeof(magic)))
489                 return;
490
491         /* At this point we know we're looking at a compressed item. */
492         memcpy(&zheader, cdb->ptr, sizeof(struct CtdlCompressHeader));
493
494         compressed_data = cdb->ptr;
495         compressed_data += sizeof(struct CtdlCompressHeader);
496
497         sourceLen = (uLongf) zheader.compressed_len;
498         destLen = (uLongf) zheader.uncompressed_len;
499         uncompressed_data = malloc(zheader.uncompressed_len);
500
501         if (uncompress((Bytef *) uncompressed_data,
502                        (uLongf *) & destLen,
503                        (const Bytef *) compressed_data,
504                        (uLong) sourceLen) != Z_OK) {
505                 CtdlLogPrintf(CTDL_EMERG, "uncompress() error\n");
506                 abort();
507         }
508
509         free(cdb->ptr);
510         cdb->len = (size_t) destLen;
511         cdb->ptr = uncompressed_data;
512 }
513
514 #endif                          /* HAVE_ZLIB */
515
516
517 /*
518  * Store a piece of data.  Returns 0 if the operation was successful.  If a
519  * key already exists it should be overwritten.
520  */
521 int cdb_store(int cdb, void *ckey, int ckeylen, void *cdata, int cdatalen)
522 {
523
524         DBT dkey, ddata;
525         DB_TXN *tid;
526         int ret = 0;
527
528 #ifdef HAVE_ZLIB
529         struct CtdlCompressHeader zheader;
530         char *compressed_data = NULL;
531         int compressing = 0;
532         size_t buffer_len = 0;
533         uLongf destLen = 0;
534 #endif
535
536         memset(&dkey, 0, sizeof(DBT));
537         memset(&ddata, 0, sizeof(DBT));
538         dkey.size = ckeylen;
539         dkey.data = ckey;
540         ddata.size = cdatalen;
541         ddata.data = cdata;
542
543 #ifdef HAVE_ZLIB
544         /* Only compress Visit records.  Everything else is uncompressed. */
545         if (cdb == CDB_VISIT) {
546                 compressing = 1;
547                 zheader.magic = COMPRESS_MAGIC;
548                 zheader.uncompressed_len = cdatalen;
549                 buffer_len = ((cdatalen * 101) / 100) + 100
550                     + sizeof(struct CtdlCompressHeader);
551                 destLen = (uLongf) buffer_len;
552                 compressed_data = malloc(buffer_len);
553                 if (compress2((Bytef *) (compressed_data +
554                                          sizeof(struct
555                                                 CtdlCompressHeader)),
556                               &destLen, (Bytef *) cdata, (uLongf) cdatalen,
557                               1) != Z_OK) {
558                         CtdlLogPrintf(CTDL_EMERG, "compress2() error\n");
559                         abort();
560                 }
561                 zheader.compressed_len = (size_t) destLen;
562                 memcpy(compressed_data, &zheader,
563                        sizeof(struct CtdlCompressHeader));
564                 ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) +
565                                        zheader.compressed_len);
566                 ddata.data = compressed_data;
567         }
568 #endif
569
570         if (MYTID != NULL) {
571                 ret = dbp[cdb]->put(dbp[cdb],   /* db */
572                                     MYTID,      /* transaction ID */
573                                     &dkey,      /* key */
574                                     &ddata,     /* data */
575                                     0); /* flags */
576                 if (ret) {
577                         CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n", cdb,
578                                 db_strerror(ret));
579                         abort();
580                 }
581 #ifdef HAVE_ZLIB
582                 if (compressing)
583                         free(compressed_data);
584 #endif
585                 return ret;
586
587         } else {
588                 bailIfCursor(MYCURSORS,
589                              "attempt to write during r/o cursor");
590
591               retry:
592                 txbegin(&tid);
593
594                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
595                                          tid,   /* transaction ID */
596                                          &dkey, /* key */
597                                          &ddata,        /* data */
598                                          0))) { /* flags */
599                         if (ret == DB_LOCK_DEADLOCK) {
600                                 txabort(tid);
601                                 goto retry;
602                         } else {
603                                 CtdlLogPrintf(CTDL_EMERG, "cdb_store(%d): %s\n",
604                                         cdb, db_strerror(ret));
605                                 abort();
606                         }
607                 } else {
608                         txcommit(tid);
609 #ifdef HAVE_ZLIB
610                         if (compressing)
611                                 free(compressed_data);
612 #endif
613                         return ret;
614                 }
615         }
616 }
617
618
619 /*
620  * Delete a piece of data.  Returns 0 if the operation was successful.
621  */
622 int cdb_delete(int cdb, void *key, int keylen)
623 {
624
625         DBT dkey;
626         DB_TXN *tid;
627         int ret;
628
629         memset(&dkey, 0, sizeof dkey);
630         dkey.size = keylen;
631         dkey.data = key;
632
633         if (MYTID != NULL) {
634                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
635                 if (ret) {
636                         CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n", cdb,
637                                 db_strerror(ret));
638                         if (ret != DB_NOTFOUND)
639                                 abort();
640                 }
641         } else {
642                 bailIfCursor(MYCURSORS,
643                              "attempt to delete during r/o cursor");
644
645               retry:
646                 txbegin(&tid);
647
648                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
649                     && ret != DB_NOTFOUND) {
650                         if (ret == DB_LOCK_DEADLOCK) {
651                                 txabort(tid);
652                                 goto retry;
653                         } else {
654                                 CtdlLogPrintf(CTDL_EMERG, "cdb_delete(%d): %s\n",
655                                         cdb, db_strerror(ret));
656                                 abort();
657                         }
658                 } else {
659                         txcommit(tid);
660                 }
661         }
662         return ret;
663 }
664
665 static DBC *localcursor(int cdb)
666 {
667         int ret;
668         DBC *curs;
669
670         if (MYCURSORS[cdb] == NULL)
671                 ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
672         else
673                 ret =
674                     MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs,
675                                           DB_POSITION);
676
677         if (ret) {
678                 CtdlLogPrintf(CTDL_EMERG, "localcursor: %s\n", db_strerror(ret));
679                 abort();
680         }
681
682         return curs;
683 }
684
685
686 /*
687  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
688  * a struct cdbdata which it is the caller's responsibility to free later on
689  * using the cdb_free() routine.
690  */
691 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
692 {
693
694         struct cdbdata *tempcdb;
695         DBT dkey, dret;
696         int ret;
697
698         memset(&dkey, 0, sizeof(DBT));
699         dkey.size = keylen;
700         dkey.data = key;
701
702         if (MYTID != NULL) {
703                 memset(&dret, 0, sizeof(DBT));
704                 dret.flags = DB_DBT_MALLOC;
705                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
706         } else {
707                 DBC *curs;
708
709                 do {
710                         memset(&dret, 0, sizeof(DBT));
711                         dret.flags = DB_DBT_MALLOC;
712
713                         curs = localcursor(cdb);
714
715                         ret = curs->c_get(curs, &dkey, &dret, DB_SET);
716                         cclose(curs);
717                 }
718                 while (ret == DB_LOCK_DEADLOCK);
719
720         }
721
722         if ((ret != 0) && (ret != DB_NOTFOUND)) {
723                 CtdlLogPrintf(CTDL_EMERG, "cdb_fetch(%d): %s\n", cdb,
724                         db_strerror(ret));
725                 abort();
726         }
727
728         if (ret != 0)
729                 return NULL;
730         tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
731
732         if (tempcdb == NULL) {
733                 CtdlLogPrintf(CTDL_EMERG,
734                         "cdb_fetch: Cannot allocate memory for tempcdb\n");
735                 abort();
736         }
737
738         tempcdb->len = dret.size;
739         tempcdb->ptr = dret.data;
740 #ifdef HAVE_ZLIB
741         cdb_decompress_if_necessary(tempcdb);
742 #endif
743         return (tempcdb);
744 }
745
746
747 /*
748  * Free a cdbdata item.
749  *
750  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
751  * other code to assume ownership of that memory simply by storing the
752  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
753  * avoid freeing it.
754  */
755 void cdb_free(struct cdbdata *cdb)
756 {
757         if (cdb->ptr) {
758                 free(cdb->ptr);
759         }
760         free(cdb);
761 }
762
763 void cdb_close_cursor(int cdb)
764 {
765         if (MYCURSORS[cdb] != NULL)
766                 cclose(MYCURSORS[cdb]);
767
768         MYCURSORS[cdb] = NULL;
769 }
770
771 /* 
772  * Prepare for a sequential search of an entire database.
773  * (There is guaranteed to be no more than one traversal in
774  * progress per thread at any given time.)
775  */
776 void cdb_rewind(int cdb)
777 {
778         int ret = 0;
779
780         if (MYCURSORS[cdb] != NULL) {
781                 CtdlLogPrintf(CTDL_EMERG,
782                         "cdb_rewind: must close cursor on database %d before reopening.\n",
783                         cdb);
784                 abort();
785                 /* cclose(MYCURSORS[cdb]); */
786         }
787
788         /*
789          * Now initialize the cursor
790          */
791         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
792         if (ret) {
793                 CtdlLogPrintf(CTDL_EMERG, "cdb_rewind: db_cursor: %s\n",
794                         db_strerror(ret));
795                 abort();
796         }
797 }
798
799
800 /*
801  * Fetch the next item in a sequential search.  Returns a pointer to a 
802  * cdbdata structure, or NULL if we've hit the end.
803  */
804 struct cdbdata *cdb_next_item(int cdb)
805 {
806         DBT key, data;
807         struct cdbdata *cdbret;
808         int ret = 0;
809
810         /* Initialize the key/data pair so the flags aren't set. */
811         memset(&key, 0, sizeof(key));
812         memset(&data, 0, sizeof(data));
813         data.flags = DB_DBT_MALLOC;
814
815         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT);
816
817         if (ret) {
818                 if (ret != DB_NOTFOUND) {
819                         CtdlLogPrintf(CTDL_EMERG, "cdb_next_item(%d): %s\n",
820                                 cdb, db_strerror(ret));
821                         abort();
822                 }
823                 cclose(MYCURSORS[cdb]);
824                 MYCURSORS[cdb] = NULL;
825                 return NULL;    /* presumably, end of file */
826         }
827
828         cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
829         cdbret->len = data.size;
830         cdbret->ptr = data.data;
831 #ifdef HAVE_ZLIB
832         cdb_decompress_if_necessary(cdbret);
833 #endif
834
835         return (cdbret);
836 }
837
838
839
840 /*
841  * Transaction-based stuff.  I'm writing this as I bake cookies...
842  */
843
844 void cdb_begin_transaction(void)
845 {
846
847         bailIfCursor(MYCURSORS,
848                      "can't begin transaction during r/o cursor");
849
850         if (MYTID != NULL) {
851                 CtdlLogPrintf(CTDL_EMERG,
852                         "cdb_begin_transaction: ERROR: nested transaction\n");
853                 abort();
854         }
855
856         txbegin(&MYTID);
857 }
858
859 void cdb_end_transaction(void)
860 {
861         int i;
862
863         for (i = 0; i < MAXCDB; i++)
864                 if (MYCURSORS[i] != NULL) {
865                         CtdlLogPrintf(CTDL_WARNING,
866                                 "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
867                                 i);
868                         cclose(MYCURSORS[i]);
869                         MYCURSORS[i] = NULL;
870                 }
871
872         if (MYTID == NULL) {
873                 CtdlLogPrintf(CTDL_EMERG,
874                         "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
875                 abort();
876         } else
877                 txcommit(MYTID);
878
879         MYTID = NULL;
880 }
881
882 /*
883  * Truncate (delete every record)
884  */
885 void cdb_trunc(int cdb)
886 {
887         /* DB_TXN *tid; */
888         int ret;
889         u_int32_t count;
890
891         if (MYTID != NULL) {
892                 CtdlLogPrintf(CTDL_EMERG,
893                         "cdb_trunc must not be called in a transaction.\n");
894                 abort();
895         } else {
896                 bailIfCursor(MYCURSORS,
897                              "attempt to write during r/o cursor");
898
899               retry:
900                 /* txbegin(&tid); */
901
902                 if ((ret = dbp[cdb]->truncate(dbp[cdb], /* db */
903                                               NULL,     /* transaction ID */
904                                               &count,   /* #rows deleted */
905                                               0))) {    /* flags */
906                         if (ret == DB_LOCK_DEADLOCK) {
907                                 /* txabort(tid); */
908                                 goto retry;
909                         } else {
910                                 CtdlLogPrintf(CTDL_EMERG,
911                                         "cdb_truncate(%d): %s\n", cdb,
912                                         db_strerror(ret));
913                                 abort();
914                         }
915                 } else {
916                         /* txcommit(tid); */
917                 }
918         }
919 }