database-related cleanups and paranoia tests;
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   0
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <db.h>
30 #include <pthread.h>
31 #include "citadel.h"
32 #include "server.h"
33 #include "citserver.h"
34 #include "database.h"
35 #include "sysdep_decls.h"
36 #include "dynloader.h"
37
38 DB *dbp[MAXCDB];                /* One DB handle for each Citadel database */
39 DB_ENV *dbenv;                  /* The DB environment (global) */
40
41 struct cdbtsd {                 /* Thread-specific DB stuff */
42         DB_TXN *tid;            /* Transaction handle */
43         DBC *cursor;            /* Cursor, for traversals... */
44 };
45
46 int num_ssd = 0;
47
48 static pthread_key_t tsdkey;
49
50 #define MYCURSOR        (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
51 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
52
53 /* just a little helper function */
54 static void txabort(DB_TXN *tid) {
55         int ret = txn_abort(tid);
56
57         if (ret) {
58                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
59                 abort();
60         }
61 }
62
63 /* this one is even more helpful than the last. */
64 static void txcommit(DB_TXN *tid) {
65         int ret = txn_commit(tid, 0);
66
67         if (ret) {
68                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
69                 abort();
70         }
71 }
72
73 /* are you sensing a pattern yet? */
74 static void txbegin(DB_TXN **tid) {
75         int ret = txn_begin(dbenv, NULL, tid, 0);
76
77         if (ret) {
78                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
79                 abort();
80         }
81 }
82
83 static void cclose(DBC *cursor) {
84         int ret;
85
86         if ((ret = cursor->c_close(cursor))) {
87                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
88                 abort();
89         }
90 }
91
92 static void check_handles(void *arg) {
93         if (arg != NULL) {
94                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
95
96                 if (tsd->cursor != NULL) {
97                         lprintf(1, "cdb_*: cursor still in progress!");
98                         abort();
99                 }
100
101                 if (tsd->tid != NULL) {
102                         lprintf(1, "cdb_*: transaction still in progress!");
103                         abort();
104                 }
105         }
106 }
107
108 static void dest_tsd(void *arg) {
109         if (arg != NULL) {
110                 check_handles(arg);
111                 phree(arg);
112         }
113 }
114
115 /*
116  * Ensure that we have a key for thread-specific data.  We don't
117  * put anything in here that Citadel cares about; this is just database
118  * related stuff like cursors and transactions.
119  *
120  * This should be called immediately after startup by any thread which wants
121  * to use database calls, except for whatever thread calls open_databases.
122  */
123 void cdb_allocate_tsd(void) {
124         struct cdbtsd *tsd;
125
126         if (pthread_getspecific(tsdkey) != NULL)
127                 return;
128
129         tsd = mallok(sizeof *tsd);
130
131         tsd->tid = NULL;
132         tsd->cursor = NULL;
133         pthread_setspecific(tsdkey, tsd);
134 }
135
136 void cdb_free_tsd(void) {
137         dest_tsd(pthread_getspecific(tsdkey));
138         pthread_setspecific(tsdkey, NULL);
139 }
140
141 void cdb_check_handles(void) {
142         check_handles(pthread_getspecific(tsdkey));
143 }
144
145
146 /*
147  * Reclaim unused space in the databases.  We need to do each one of
148  * these discretely, rather than in a loop.
149  *
150  * This is a stub function in the Sleepycat DB backend, because there is no
151  * such API call available.
152  */
153 void defrag_databases(void)
154 {
155         /* do nothing */
156 }
157
158
159
160 /*
161  * Request a checkpoint of the database.
162  */
163 static void cdb_checkpoint(void) {
164         int ret;
165
166         ret = txn_checkpoint(dbenv,
167                                 MAX_CHECKPOINT_KBYTES,
168                                 MAX_CHECKPOINT_MINUTES,
169                                 0);
170         if (ret) {
171                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
172                 abort();
173         }
174 }
175
176 /*
177  * Open the various databases we'll be using.  Any database which
178  * does not exist should be created.  Note that we don't need an S_DATABASE
179  * critical section here, because there aren't any active threads manipulating
180  * the database yet -- and besides, it causes problems on BSDI.
181  */
182 void open_databases(void)
183 {
184         int ret;
185         int i;
186         char dbfilename[SIZ];
187         u_int32_t flags = 0;
188
189         lprintf(9, "cdb_*: open_databases() starting\n");
190         /*
191          * Silently try to create the database subdirectory.  If it's
192          * already there, no problem.
193          */
194         system("exec mkdir data 2>/dev/null");
195
196         lprintf(9, "cdb_*: Setting up DB environment\n");
197         db_env_set_func_yield(sched_yield);
198         ret = db_env_create(&dbenv, 0);
199         if (ret) {
200                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
201                 exit(ret);
202         }
203         dbenv->set_errpfx(dbenv, "citserver");
204
205         /*
206          * We want to specify the shared memory buffer pool cachesize,
207          * but everything else is the default.
208          */
209         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
210         if (ret) {
211                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
212                 dbenv->close(dbenv, 0);
213                 exit(ret);
214         }
215
216         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
217                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
218                 dbenv->close(dbenv, 0);
219                 exit(ret);
220         }
221
222         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
223                 DB_INIT_LOCK|DB_THREAD;
224         ret = dbenv->open(dbenv, "./data", flags, 0);
225         if (ret) {
226                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
227                 dbenv->close(dbenv, 0);
228                 exit(ret);
229         }
230
231         lprintf(7, "cdb_*: Starting up DB\n");
232
233         for (i = 0; i < MAXCDB; ++i) {
234
235                 /* Create a database handle */
236                 ret = db_create(&dbp[i], dbenv, 0);
237                 if (ret) {
238                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
239                         exit(ret);
240                 }
241
242
243                 /* Arbitrary names for our tables -- we reference them by
244                  * number, so we don't have string names for them.
245                  */
246                 sprintf(dbfilename, "cdb.%02x", i);
247
248                 ret = dbp[i]->open(dbp[i],
249                                 dbfilename,
250                                 NULL,
251                                 DB_BTREE,
252                                 DB_CREATE|DB_THREAD,
253                                 0600);
254                 if (ret) {
255                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
256                         exit(ret);
257                 }
258         }
259
260         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
261                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
262                 exit(1);
263         }
264
265         cdb_allocate_tsd();
266         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
267         lprintf(9, "cdb_*: open_databases() finished\n");
268 }
269
270
271 /*
272  * Close all of the db database files we've opened.  This can be done
273  * in a loop, since it's just a bunch of closes.
274  */
275 void close_databases(void)
276 {
277         int a;
278         int ret;
279
280         cdb_free_tsd();
281
282         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
283                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
284                 abort();
285         }
286
287         for (a = 0; a < MAXCDB; ++a) {
288                 lprintf(7, "cdb_*: Closing database %d\n", a);
289                 ret = dbp[a]->close(dbp[a], 0);
290                 if (ret) {
291                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
292                         abort();
293                 }
294                 
295         }
296
297         /* Close the handle. */
298         ret = dbenv->close(dbenv, 0);
299         if (ret) {
300                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
301                 abort();
302         }
303 }
304
305 /*
306  * Store a piece of data.  Returns 0 if the operation was successful.  If a
307  * key already exists it should be overwritten.
308  */
309 int cdb_store(int cdb,
310               void *ckey, int ckeylen,
311               void *cdata, int cdatalen)
312 {
313
314         DBT dkey, ddata;
315         DB_TXN *tid;
316         int ret;
317
318         memset(&dkey, 0, sizeof(DBT));
319         memset(&ddata, 0, sizeof(DBT));
320         dkey.size = ckeylen;
321         dkey.data = ckey;
322         ddata.size = cdatalen;
323         ddata.data = cdata;
324
325         if (MYTID != NULL) {
326                 ret = dbp[cdb]->put(dbp[cdb],           /* db */
327                                         MYTID,          /* transaction ID */
328                                         &dkey,          /* key */
329                                         &ddata,         /* data */
330                                         0);             /* flags */
331                 if (ret) {
332                         lprintf(1, "cdb_store(%d): %s\n", cdb,
333                                 db_strerror(ret));
334                         abort();
335                 }
336                 return ret;
337         } else {
338             retry:
339                 txbegin(&tid);
340
341                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
342                                          tid,           /* transaction ID */
343                                          &dkey,         /* key */
344                                          &ddata,        /* data */
345                                          0))) {         /* flags */
346                         if (ret == DB_LOCK_DEADLOCK) {
347                                 txabort(tid);
348                                 goto retry;
349                         } else {
350                                 lprintf(1, "cdb_store(%d): %s\n", cdb,
351                                         db_strerror(ret));
352                                 abort();
353                         }
354                 } else {
355                         txcommit(tid);
356                         return ret;
357                 }
358         }
359 }
360
361
362 /*
363  * Delete a piece of data.  Returns 0 if the operation was successful.
364  */
365 int cdb_delete(int cdb, void *key, int keylen)
366 {
367
368         DBT dkey;
369         DB_TXN *tid;
370         int ret;
371
372         memset(&dkey, 0, sizeof dkey);
373         dkey.size = keylen;
374         dkey.data = key;
375
376         if (MYTID != NULL) {
377                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
378                 if (ret) {
379                         lprintf(1, "cdb_delete(%d): %s\n", cdb,
380                                 db_strerror(ret));
381                         if (ret != DB_NOTFOUND)
382                                 abort();
383                 }
384         } else {
385             retry:
386                 txbegin(&tid);
387
388                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
389                     && ret != DB_NOTFOUND) {
390                         if (ret == DB_LOCK_DEADLOCK) {
391                                         txabort(tid);
392                                         goto retry;
393                         } else {
394                                 lprintf(1, "cdb_delete(%d): %s\n", cdb,
395                                         db_strerror(ret));
396                                 abort();
397                         }
398                 } else {
399                         txcommit(tid);
400                 }
401         }
402         return ret;
403 }
404
405
406
407
408 /*
409  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
410  * a struct cdbdata which it is the caller's responsibility to free later on
411  * using the cdb_free() routine.
412  */
413 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
414 {
415
416         struct cdbdata *tempcdb;
417         DBT dkey, dret;
418         DB_TXN *tid;
419         int ret;
420
421         memset(&dkey, 0, sizeof(DBT));
422         memset(&dret, 0, sizeof(DBT));
423         dkey.size = keylen;
424         dkey.data = key;
425         dret.flags = DB_DBT_MALLOC;
426
427         if (MYTID != NULL) {
428                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
429         } else {
430             retry:
431                 txbegin(&tid);
432
433                 ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
434
435                 if (ret == DB_LOCK_DEADLOCK) {
436                         txabort(tid);
437                         goto retry;
438                 }
439                 if (ret && ret != DB_NOTFOUND)
440                         abort();
441
442                 txcommit(tid);
443         }
444
445         if ((ret != 0) && (ret != DB_NOTFOUND)) {
446                 lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
447                 abort();
448         }
449         if (ret != 0) return NULL;
450         tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
451         if (tempcdb == NULL) {
452                 lprintf(2, "cdb_fetch: Cannot allocate memory!\n");
453                 abort();
454         }
455         tempcdb->len = dret.size;
456         tempcdb->ptr = dret.data;
457         return (tempcdb);
458 }
459
460
461 /*
462  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
463  * more complex stuff with other database managers in the future).
464  */
465 void cdb_free(struct cdbdata *cdb)
466 {
467         phree(cdb->ptr);
468         phree(cdb);
469 }
470
471
472 /* 
473  * Prepare for a sequential search of an entire database.
474  * (There is guaranteed to be no more than one traversal in
475  * progress per thread at any given time.)
476  */
477 void cdb_rewind(int cdb)
478 {
479         int ret = 0;
480
481         if (MYCURSOR != NULL)
482                 cclose(MYCURSOR);
483
484         if (MYTID == NULL) {
485                 lprintf(1, "cdb_rewind: ERROR: cursor use outside transaction\n");
486                 abort();
487         }
488
489         /*
490          * Now initialize the cursor
491          */
492         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
493         if (ret) {
494                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
495                 abort();
496         }
497 }
498
499
500 /*
501  * Fetch the next item in a sequential search.  Returns a pointer to a 
502  * cdbdata structure, or NULL if we've hit the end.
503  */
504 struct cdbdata *cdb_next_item(int cdb)
505 {
506         DBT key, data;
507         struct cdbdata *cdbret;
508         int ret = 0;
509
510         /* Initialize the key/data pair so the flags aren't set. */
511         memset(&key, 0, sizeof(key));
512         memset(&data, 0, sizeof(data));
513         data.flags = DB_DBT_MALLOC;
514
515         ret = MYCURSOR->c_get(MYCURSOR,
516                 &key, &data, DB_NEXT);
517         
518         if (ret) {
519                 if (ret != DB_NOTFOUND) {
520                         lprintf(1, "cdb_next_item(%d): %s\n",
521                                 cdb, db_strerror(ret));
522                         abort();
523                 }
524                 cclose(MYCURSOR);
525                 MYCURSOR = NULL;
526                 return NULL;            /* presumably, end of file */
527         }
528
529         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
530         cdbret->len = data.size;
531         cdbret->ptr = data.data;
532
533         return (cdbret);
534 }
535
536
537 /*
538  * Transaction-based stuff.  I'm writing this as I bake cookies...
539  */
540
541 void cdb_begin_transaction(void) {
542
543         if (MYTID != NULL) {    /* FIXME this slows it down, take it out */
544                 lprintf(1, "cdb_begin_transaction: ERROR: opening a new transaction with one already open!\n");
545                 abort();
546         }
547         else {
548                 txbegin(&MYTID);
549         }
550 }
551
552 void cdb_end_transaction(void) {
553         if (MYCURSOR != NULL) {
554                 lprintf(1, "cdb_end_transaction: WARNING: cursor still open at transaction end\n");
555                 cclose(MYCURSOR);
556                 MYCURSOR = NULL;
557         }
558         if (MYTID == NULL) {
559                 lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
560                 abort();
561         } else
562                 txcommit(MYTID);
563
564         MYTID = NULL;
565 }
566