9f9670217df9713a9194c8de087c7424d0093c61
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   256
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #ifdef DLL_EXPORT
22 #define IN_LIBCIT
23 #endif
24
25 #include "sysdep.h"
26 #include <stdlib.h>
27 #include <unistd.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <dirent.h>
35
36 #ifdef HAVE_DB_H
37 #include <db.h>
38 #elif defined(HAVE_DB4_DB_H)
39 #include <db4/db.h>
40 #elif defined(HAVE_DB3_DB_H)
41 #include <db3/db.h>
42 #else
43 #error Neither <db.h> nor <db3/db.h> was found by configure. Install db3-devel.
44 #endif
45
46 #include <pthread.h>
47 #include "citadel.h"
48 #include "server.h"
49 #include "dynloader.h"
50 #include "citserver.h"
51 #include "database.h"
52 #include "msgbase.h"
53 #include "sysdep_decls.h"
54
55 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
56 static DB_ENV *dbenv;           /* The DB environment (global) */
57
58 struct cdbtsd {                 /* Thread-specific DB stuff */
59         DB_TXN *tid;            /* Transaction handle */
60         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
61 };
62
63 static pthread_key_t tsdkey;
64
65 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
66 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
67
68 /* just a little helper function */
69 static void txabort(DB_TXN *tid) {
70         int ret = txn_abort(tid);
71
72         if (ret) {
73                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
74                 abort();
75         }
76 }
77
78 /* this one is even more helpful than the last. */
79 static void txcommit(DB_TXN *tid) {
80         int ret = txn_commit(tid, 0);
81
82         if (ret) {
83                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
84                 abort();
85         }
86 }
87
88 /* are you sensing a pattern yet? */
89 static void txbegin(DB_TXN **tid) {
90         int ret = txn_begin(dbenv, NULL, tid, 0);
91
92         if (ret) {
93                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
94                 abort();
95         }
96 }
97
98 static void cclose(DBC *cursor) {
99         int ret;
100
101         if ((ret = cursor->c_close(cursor))) {
102                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
103                 abort();
104         }
105 }
106
107 static void bailIfCursor(DBC **cursors, const char *msg)
108 {
109   int i;
110
111   for (i = 0; i < MAXCDB; i++)
112     if (cursors[i] != NULL)
113       {
114         lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
115         abort();
116       }
117 }
118
119 static void check_handles(void *arg) {
120         if (arg != NULL) {
121                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
122
123                 bailIfCursor(tsd->cursors, "in check_handles");
124
125                 if (tsd->tid != NULL) {
126                         lprintf(1, "cdb_*: transaction still in progress!");
127                         abort();
128                 }
129         }
130 }
131
132 static void dest_tsd(void *arg) {
133         if (arg != NULL) {
134                 check_handles(arg);
135                 phree(arg);
136         }
137 }
138
139 /*
140  * Ensure that we have a key for thread-specific data.  We don't
141  * put anything in here that Citadel cares about; this is just database
142  * related stuff like cursors and transactions.
143  *
144  * This should be called immediately after startup by any thread which wants
145  * to use database calls, except for whatever thread calls open_databases.
146  */
147 void cdb_allocate_tsd(void) {
148         struct cdbtsd *tsd;
149
150         if (pthread_getspecific(tsdkey) != NULL)
151                 return;
152
153         tsd = mallok(sizeof *tsd);
154
155         tsd->tid = NULL;
156
157         memset(tsd->cursors, 0, sizeof tsd->cursors);
158         pthread_setspecific(tsdkey, tsd);
159 }
160
161 void cdb_free_tsd(void) {
162         dest_tsd(pthread_getspecific(tsdkey));
163         pthread_setspecific(tsdkey, NULL);
164 }
165
166 void cdb_check_handles(void) {
167         check_handles(pthread_getspecific(tsdkey));
168 }
169
170
171 /*
172  * Reclaim unused space in the databases.  We need to do each one of
173  * these discretely, rather than in a loop.
174  *
175  * This is a stub function in the Sleepycat DB backend, because there is no
176  * such API call available.
177  */
178 void defrag_databases(void)
179 {
180         /* do nothing */
181 }
182
183
184 /*
185  * Cull the database logs
186  */
187 static void cdb_cull_logs(void) {
188         u_int32_t flags;
189         int ret;
190         char **file, **list;
191         char errmsg[SIZ];
192
193         lprintf(5, "Database log file cull started.\n");
194
195         flags = DB_ARCH_ABS;
196
197         /* Get the list of names. */
198 #if DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR < 3
199         if ((ret = log_archive(dbenv, &list, flags, NULL)) != 0) {
200 #elif DB_VERSION_MAJOR >= 4
201         if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
202 #else
203         if ((ret = log_archive(dbenv, &list, flags)) != 0) {
204 #endif
205                 lprintf(1, "cdb_cull_logs: %s\n", db_strerror(ret));
206                 return;
207         }
208
209         /* Print the list of names. */
210         if (list != NULL) {
211                 for (file = list; *file != NULL; ++file) {
212                         lprintf(9, "Deleting log: %s\n", *file);
213                         ret = unlink(*file);
214                         if (ret != 0) {
215                                 snprintf(errmsg, sizeof(errmsg),
216                                         " ** ERROR **\n \n \n "
217                                         "Citadel was unable to delete the "
218                                         "database log file '%s' because of the "
219                                         "following error:\n \n %s\n \n"
220                                         " This log file is no longer in use "
221                                         "and may be safely deleted.\n",
222                                         *file,
223                                         strerror(errno));
224                                 aide_message(errmsg);
225                         }
226                 }
227                 free(list);
228         }
229
230         lprintf(5, "Database log file cull ended.\n");
231 }
232
233
234 /*
235  * Request a checkpoint of the database.
236  */
237 static void cdb_checkpoint(void) {
238         int ret;
239         static time_t last_cull = 0L;
240
241 #if DB_VERSION_MAJOR >= 4
242         ret = dbenv->txn_checkpoint(dbenv,
243 #else
244         ret = txn_checkpoint(dbenv,
245 #endif
246                                 MAX_CHECKPOINT_KBYTES,
247                                 MAX_CHECKPOINT_MINUTES,
248                                 0);
249         if ( (ret != 0) && (ret != DB_INCOMPLETE) ) {
250                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
251                 abort();
252         }
253
254         if (ret == DB_INCOMPLETE) {
255                 lprintf(3, "WARNING: txn_checkpoint: %s\n", db_strerror(ret));
256         }
257
258         /* Cull the logs if we haven't done so for 24 hours */
259         if ((time(NULL) - last_cull) > 86400L) {
260                 last_cull = time(NULL);
261                 cdb_cull_logs();
262         }
263
264 }
265
266 /*
267  * Open the various databases we'll be using.  Any database which
268  * does not exist should be created.  Note that we don't need an S_DATABASE
269  * critical section here, because there aren't any active threads manipulating
270  * the database yet -- and besides, it causes problems on BSDI.
271  */
272 void open_databases(void)
273 {
274         int ret;
275         int i;
276         char dbfilename[SIZ];
277         u_int32_t flags = 0;
278
279         lprintf(9, "cdb_*: open_databases() starting\n");
280         /*
281          * Silently try to create the database subdirectory.  If it's
282          * already there, no problem.
283          */
284         system("exec mkdir data 2>/dev/null");
285
286         lprintf(9, "cdb_*: Setting up DB environment\n");
287         db_env_set_func_yield(sched_yield);
288         ret = db_env_create(&dbenv, 0);
289         if (ret) {
290                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
291                 exit(ret);
292         }
293         dbenv->set_errpfx(dbenv, "citserver");
294
295         /*
296          * We want to specify the shared memory buffer pool cachesize,
297          * but everything else is the default.
298          */
299         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
300         if (ret) {
301                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
302                 dbenv->close(dbenv, 0);
303                 exit(ret);
304         }
305
306         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
307                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
308                 dbenv->close(dbenv, 0);
309                 exit(ret);
310         }
311
312         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
313                 DB_INIT_LOCK|DB_THREAD;
314         ret = dbenv->open(dbenv, "./data", flags, 0);
315         if (ret) {
316                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
317                 dbenv->close(dbenv, 0);
318                 exit(ret);
319         }
320
321         lprintf(7, "cdb_*: Starting up DB\n");
322
323         for (i = 0; i < MAXCDB; ++i) {
324
325                 /* Create a database handle */
326                 ret = db_create(&dbp[i], dbenv, 0);
327                 if (ret) {
328                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
329                         exit(ret);
330                 }
331
332
333                 /* Arbitrary names for our tables -- we reference them by
334                  * number, so we don't have string names for them.
335                  */
336                 snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
337
338                 ret = dbp[i]->open(dbp[i],
339                                 dbfilename,
340                                 NULL,
341                                 DB_BTREE,
342                                 DB_CREATE|DB_THREAD,
343                                 0600);
344                 if (ret) {
345                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
346                         exit(ret);
347                 }
348         }
349
350         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
351                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
352                 exit(1);
353         }
354
355         cdb_allocate_tsd();
356         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
357         lprintf(9, "cdb_*: open_databases() finished\n");
358 }
359
360
361 /*
362  * Close all of the db database files we've opened.  This can be done
363  * in a loop, since it's just a bunch of closes.
364  */
365 void close_databases(void)
366 {
367         int a;
368         int ret;
369
370         cdb_free_tsd();
371
372 #if DB_VERSION_MAJOR >= 4
373         if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
374 #else
375         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
376 #endif
377                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
378                 abort();
379         }
380
381         for (a = 0; a < MAXCDB; ++a) {
382                 lprintf(7, "cdb_*: Closing database %d\n", a);
383                 ret = dbp[a]->close(dbp[a], 0);
384                 if (ret) {
385                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
386                         abort();
387                 }
388                 
389         }
390
391         /* Close the handle. */
392         ret = dbenv->close(dbenv, 0);
393         if (ret) {
394                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
395                 abort();
396         }
397 }
398
399 /*
400  * Store a piece of data.  Returns 0 if the operation was successful.  If a
401  * key already exists it should be overwritten.
402  */
403 int cdb_store(int cdb,
404               void *ckey, int ckeylen,
405               void *cdata, int cdatalen)
406 {
407
408   DBT dkey, ddata;
409   DB_TXN *tid;
410   int ret;
411   
412   memset(&dkey, 0, sizeof(DBT));
413   memset(&ddata, 0, sizeof(DBT));
414   dkey.size = ckeylen;
415   dkey.data = ckey;
416   ddata.size = cdatalen;
417   ddata.data = cdata;
418   
419   if (MYTID != NULL)
420     {
421       ret = dbp[cdb]->put(dbp[cdb],             /* db */
422                           MYTID,                /* transaction ID */
423                           &dkey,                /* key */
424                           &ddata,               /* data */
425                           0);           /* flags */
426       if (ret)
427         {
428           lprintf(1, "cdb_store(%d): %s\n", cdb,
429                   db_strerror(ret));
430           abort();
431         }
432       return ret;
433       
434     }
435   else
436     {
437       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
438       
439     retry:
440       txbegin(&tid);
441       
442       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
443                                tid,         /* transaction ID */
444                                &dkey,       /* key */
445                                &ddata,      /* data */
446                                0)))         /* flags */
447         {
448           if (ret == DB_LOCK_DEADLOCK)
449             {
450               txabort(tid);
451               goto retry;
452             }
453           else
454             {
455               lprintf(1, "cdb_store(%d): %s\n", cdb,
456                       db_strerror(ret));
457               abort();
458             }
459         }
460       else
461         {
462           txcommit(tid);
463           return ret;
464         }
465     }
466 }
467
468
469 /*
470  * Delete a piece of data.  Returns 0 if the operation was successful.
471  */
472 int cdb_delete(int cdb, void *key, int keylen)
473 {
474
475   DBT dkey;
476   DB_TXN *tid;
477   int ret;
478
479   memset(&dkey, 0, sizeof dkey);
480   dkey.size = keylen;
481   dkey.data = key;
482
483   if (MYTID != NULL)
484     {
485       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
486       if (ret)
487         {
488           lprintf(1, "cdb_delete(%d): %s\n", cdb,
489                   db_strerror(ret));
490           if (ret != DB_NOTFOUND)
491             abort();
492         }
493     }
494   else
495     {
496       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
497     
498     retry:
499       txbegin(&tid);
500     
501       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
502           && ret != DB_NOTFOUND)
503         {
504           if (ret == DB_LOCK_DEADLOCK)
505             {
506               txabort(tid);
507               goto retry;
508             }
509           else
510             {
511               lprintf(1, "cdb_delete(%d): %s\n", cdb,
512                       db_strerror(ret));
513               abort();
514             }
515         }
516       else
517         {
518           txcommit(tid);
519         }
520     }
521   return ret;
522 }
523
524 static DBC *localcursor(int cdb)
525 {
526   int ret;
527   DBC *curs;
528
529   if (MYCURSORS[cdb] == NULL)
530     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
531   else
532     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
533
534   if (ret)
535     {
536       lprintf(1, "localcursor: %s\n", db_strerror(ret));
537       abort();
538     }
539
540   return curs;
541 }
542
543
544 /*
545  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
546  * a struct cdbdata which it is the caller's responsibility to free later on
547  * using the cdb_free() routine.
548  */
549 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
550 {
551
552   struct cdbdata *tempcdb;
553   DBT dkey, dret;
554   int ret;
555
556   memset(&dkey, 0, sizeof(DBT));
557   dkey.size = keylen;
558   dkey.data = key;
559
560   if (MYTID != NULL)
561     {
562       memset(&dret, 0, sizeof(DBT));
563       dret.flags = DB_DBT_MALLOC;
564       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
565     }
566   else
567     {
568       DBC *curs;
569
570       do
571         {
572           memset(&dret, 0, sizeof(DBT));
573           dret.flags = DB_DBT_MALLOC;
574
575           curs = localcursor(cdb);
576
577           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
578           cclose(curs);
579         }
580       while (ret == DB_LOCK_DEADLOCK);
581
582     }
583
584   if ((ret != 0) && (ret != DB_NOTFOUND))
585     {
586       lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
587       abort();
588     }
589
590   if (ret != 0) return NULL;
591   tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
592
593   if (tempcdb == NULL)
594     {
595       lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
596       abort();
597     }
598
599   tempcdb->len = dret.size;
600   tempcdb->ptr = dret.data;
601   return (tempcdb);
602 }
603
604
605 /*
606  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
607  * more complex stuff with other database managers in the future).
608  */
609 void cdb_free(struct cdbdata *cdb)
610 {
611         phree(cdb->ptr);
612         phree(cdb);
613 }
614
615 void cdb_close_cursor(int cdb)
616 {
617         if (MYCURSORS[cdb] != NULL)
618                 cclose(MYCURSORS[cdb]);
619
620         MYCURSORS[cdb] = NULL;
621 }
622
623 /* 
624  * Prepare for a sequential search of an entire database.
625  * (There is guaranteed to be no more than one traversal in
626  * progress per thread at any given time.)
627  */
628 void cdb_rewind(int cdb)
629 {
630         int ret = 0;
631
632         if (MYCURSORS[cdb] != NULL)
633                 cclose(MYCURSORS[cdb]);
634
635         /*
636          * Now initialize the cursor
637          */
638         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
639         if (ret) {
640                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
641                 abort();
642         }
643 }
644
645
646 /*
647  * Fetch the next item in a sequential search.  Returns a pointer to a 
648  * cdbdata structure, or NULL if we've hit the end.
649  */
650 struct cdbdata *cdb_next_item(int cdb)
651 {
652         DBT key, data;
653         struct cdbdata *cdbret;
654         int ret = 0;
655
656         /* Initialize the key/data pair so the flags aren't set. */
657         memset(&key, 0, sizeof(key));
658         memset(&data, 0, sizeof(data));
659         data.flags = DB_DBT_MALLOC;
660
661         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
662                 &key, &data, DB_NEXT);
663         
664         if (ret) {
665                 if (ret != DB_NOTFOUND) {
666                         lprintf(1, "cdb_next_item(%d): %s\n",
667                                 cdb, db_strerror(ret));
668                         abort();
669                 }
670                 cclose(MYCURSORS[cdb]);
671                 MYCURSORS[cdb] = NULL;
672                 return NULL;            /* presumably, end of file */
673         }
674
675         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
676         cdbret->len = data.size;
677         cdbret->ptr = data.data;
678
679         return (cdbret);
680 }
681
682
683
684 /*
685  * Transaction-based stuff.  I'm writing this as I bake cookies...
686  */
687
688 void cdb_begin_transaction(void) {
689
690   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
691
692   if (MYTID != NULL)
693     {
694       lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
695       abort();
696     }
697
698   txbegin(&MYTID);
699 }
700
701 void cdb_end_transaction(void) {
702   int i;
703
704   for (i = 0; i < MAXCDB; i++)
705     if (MYCURSORS[i] != NULL) {
706       lprintf(1, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
707       cclose(MYCURSORS[i]);
708       MYCURSORS[i] = NULL;
709     }
710
711   if (MYTID == NULL)
712     {
713       lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
714       abort();
715     }
716   else
717     txcommit(MYTID);
718
719   MYTID = NULL;
720 }
721
722 /*
723  * Truncate (delete every record)
724  */
725 void cdb_trunc(int cdb)
726 {
727   DB_TXN *tid;
728   int ret;
729 #if DB_VERSION_MAJOR > 3 || DB_VERSION_MINOR > 2
730   u_int32_t count;
731 #endif
732   
733   if (MYTID != NULL)
734     {
735       lprintf(1, "cdb_trunc must not be called in a transaction.\n");
736       abort();
737     }
738   else
739     {
740       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
741       
742 #if DB_VERSION_MAJOR == 3 && DB_VERSION_MINOR < 3
743       for (;;)
744         {
745           DBT key, data;
746
747           /* Initialize the key/data pair so the flags aren't set. */
748           memset(&key, 0, sizeof(key));
749           memset(&data, 0, sizeof(data));
750
751           txbegin(&tid);
752           
753           ret = dbp[cdb]->cursor(dbp[cdb], tid, &MYCURSORS[cdb], 0);
754           if (ret)
755             {
756               lprintf(1, "cdb_trunc: db_cursor: %s\n", db_strerror(ret));
757               abort();
758             }
759
760           ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
761                                       &key, &data, DB_NEXT);
762           if (ret)
763             {
764               cclose(MYCURSORS[cdb]);
765               txabort(tid);
766               if (ret == DB_LOCK_DEADLOCK)
767                 continue;
768
769               if (ret == DB_NOTFOUND)
770                 break;
771
772               lprintf(1, "cdb_trunc: c_get: %s\n", db_strerror(ret));
773               abort();
774             }
775
776           ret = MYCURSORS[cdb]->c_del(MYCURSORS[cdb], 0);
777           if (ret)
778             {
779               cclose(MYCURSORS[cdb]);
780               txabort(tid);
781               if (ret == DB_LOCK_DEADLOCK)
782                 continue;
783
784               lprintf(1, "cdb_trunc: c_del: %s\n", db_strerror(ret));
785               abort();
786             }
787
788           cclose(MYCURSORS[cdb]);
789           txcommit(tid);
790         }
791 #else
792     retry:
793       txbegin(&tid);
794       
795       if ((ret = dbp[cdb]->truncate(dbp[cdb],    /* db */
796                                     tid,         /* transaction ID */
797                                     &count,      /* #rows deleted */
798                                     0)))         /* flags */
799         {
800           if (ret == DB_LOCK_DEADLOCK)
801             {
802               txabort(tid);
803               goto retry;
804             }
805           else
806             {
807               lprintf(1, "cdb_truncate(%d): %s\n", cdb,
808                       db_strerror(ret));
809               abort();
810             }
811         }
812       else
813         {
814           txcommit(tid);
815         }
816 #endif
817     }
818 }