* more merges
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   0
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <db.h>
30 #include <pthread.h>
31 #include "citadel.h"
32 #include "server.h"
33 #include "citserver.h"
34 #include "database.h"
35 #include "sysdep_decls.h"
36 #include "dynloader.h"
37
38 DB *dbp[MAXCDB];                /* One DB handle for each Citadel database */
39 DB_ENV *dbenv;                  /* The DB environment (global) */
40
41 struct cdbtsd {                 /* Thread-specific DB stuff */
42         DB_TXN *tid;            /* Transaction handle */
43         DBC *cursor;            /* Cursor, for traversals... */
44 };
45
46 int num_ssd = 0;
47
48 static pthread_key_t tsdkey;
49
50 #define MYCURSOR        (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
51 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
52
53 /* just a little helper function */
54 static int txabort(DB_TXN *tid) {
55         int ret = txn_abort(tid);
56
57         if (ret)
58                 lprintf(1, "txn_abort: %s\n", db_strerror(ret));
59
60         return ret;
61 }
62
63 /* this one is even more helpful than the last. */
64 static int txcommit(DB_TXN *tid) {
65         int ret = txn_commit(tid, 0);
66
67         if (ret)
68                 lprintf(1, "txn_commit: %s\n", db_strerror(ret));
69
70         return ret;
71 }
72
73 /* are you sensing a pattern yet? */
74 static int txbegin(DB_TXN **tid) {
75         int ret = txn_begin(dbenv, NULL, tid, 0);
76
77         if (ret)
78                 lprintf(1, "txn_begin: %s\n", db_strerror(ret));
79
80         return ret;
81 }
82
83 static void release_handles(void *arg) {
84         if (arg != NULL) {
85                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
86
87                 if (tsd->cursor != NULL) {
88                         lprintf(1, "WARNING: cursor still in progress; "
89                                 "closing!\n");
90                         tsd->cursor->c_close(tsd->cursor);
91                 }
92
93                 if (tsd->tid != NULL) {
94                         lprintf(1, "ERROR: transaction still in progress; "
95                                 "aborting!\n");
96                         txabort(tsd->tid);
97                 }
98         }
99 }
100
101 static void dest_tsd(void *arg) {
102         if (arg != NULL) {
103                 release_handles(arg);
104                 phree(arg);
105         }
106 }
107
108 /*
109  * Ensure that we have a key for thread-specific data.  We don't
110  * put anything in here that Citadel cares about; this is just database
111  * related stuff like cursors and transactions.
112  *
113  * This should be called immediately after startup by any thread which wants
114  * to use database calls, except for whatever thread calls open_databases.
115  */
116 void cdb_allocate_tsd(void) {
117         struct cdbtsd *tsd = mallok(sizeof *tsd);
118
119         tsd->tid = NULL;
120         tsd->cursor = NULL;
121         pthread_setspecific(tsdkey, tsd);
122 }
123
124 void cdb_free_tsd(void) {
125         dest_tsd(pthread_getspecific(tsdkey));
126         pthread_setspecific(tsdkey, NULL);
127 }
128
129 void cdb_release_handles(void) {
130         release_handles(pthread_getspecific(tsdkey));
131 }
132
133
134 /*
135  * Reclaim unused space in the databases.  We need to do each one of
136  * these discretely, rather than in a loop.
137  *
138  * This is a stub function in the Sleepycat DB backend, because there is no
139  * such API call available.
140  */
141 void defrag_databases(void)
142 {
143         /* do nothing */
144 }
145
146
147
148 /*
149  * Request a checkpoint of the database.
150  */
151 static void cdb_checkpoint(void) {
152         int ret;
153
154         ret = txn_checkpoint(dbenv,
155                                 MAX_CHECKPOINT_KBYTES,
156                                 MAX_CHECKPOINT_MINUTES,
157                                 0);
158         if (ret) {
159                 lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
160         }
161 }
162
163 /*
164  * Open the various databases we'll be using.  Any database which
165  * does not exist should be created.  Note that we don't need an S_DATABASE
166  * critical section here, because there aren't any active threads manipulating
167  * the database yet -- and besides, it causes problems on BSDI.
168  */
169 void open_databases(void)
170 {
171         int ret;
172         int i;
173         char dbfilename[SIZ];
174         u_int32_t flags = 0;
175
176         lprintf(9, "open_databases() starting\n");
177         /*
178          * Silently try to create the database subdirectory.  If it's
179          * already there, no problem.
180          */
181         system("exec mkdir data 2>/dev/null");
182
183         lprintf(9, "Setting up DB environment\n");
184         db_env_set_func_yield(sched_yield);
185         ret = db_env_create(&dbenv, 0);
186         if (ret) {
187                 lprintf(1, "db_env_create: %s\n", db_strerror(ret));
188                 exit(ret);
189         }
190         dbenv->set_errpfx(dbenv, "citserver");
191
192         /*
193          * We want to specify the shared memory buffer pool cachesize,
194          * but everything else is the default.
195          */
196         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
197         if (ret) {
198                 lprintf(1, "set_cachesize: %s\n", db_strerror(ret));
199                 dbenv->close(dbenv, 0);
200                 exit(ret);
201         }
202
203         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
204                 lprintf(1, "set_lk_detect: %s\n", db_strerror(ret));
205                 dbenv->close(dbenv, 0);
206                 exit(ret);
207         }
208
209         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
210                 DB_INIT_LOCK|DB_THREAD;
211         ret = dbenv->open(dbenv, "./data", flags, 0);
212         if (ret) {
213                 lprintf(1, "dbenv->open: %s\n", db_strerror(ret));
214                 dbenv->close(dbenv, 0);
215                 exit(ret);
216         }
217
218         lprintf(7, "Starting up DB\n");
219
220         for (i = 0; i < MAXCDB; ++i) {
221
222                 /* Create a database handle */
223                 ret = db_create(&dbp[i], dbenv, 0);
224                 if (ret) {
225                         lprintf(1, "db_create: %s\n", db_strerror(ret));
226                         exit(ret);
227                 }
228
229
230                 /* Arbitrary names for our tables -- we reference them by
231                  * number, so we don't have string names for them.
232                  */
233                 sprintf(dbfilename, "cdb.%02x", i);
234
235                 ret = dbp[i]->open(dbp[i],
236                                 dbfilename,
237                                 NULL,
238                                 DB_BTREE,
239                                 DB_CREATE,
240                                 0600);
241                 if (ret) {
242                         lprintf(1, "db_open[%d]: %s\n", i, db_strerror(ret));
243                         exit(ret);
244                 }
245         }
246
247         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
248                 lprintf(1, "pthread_key_create: %s\n", strerror(ret));
249                 exit(1);
250         }
251
252         cdb_allocate_tsd();
253         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
254         lprintf(9, "open_databases() finished\n");
255 }
256
257
258 /*
259  * Close all of the gdbm database files we've opened.  This can be done
260  * in a loop, since it's just a bunch of closes.
261  */
262 void close_databases(void)
263 {
264         int a;
265         int ret;
266
267         cdb_free_tsd();
268
269         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
270                 lprintf(1, "txn_checkpoint: %s\n", db_strerror(ret));
271         }
272
273         for (a = 0; a < MAXCDB; ++a) {
274                 lprintf(7, "Closing database %d\n", a);
275                 ret = dbp[a]->close(dbp[a], 0);
276                 if (ret) {
277                         lprintf(1, "db_close: %s\n", db_strerror(ret));
278                 }
279                 
280         }
281
282         /* Close the handle. */
283         ret = dbenv->close(dbenv, 0);
284         if (ret) {
285                 lprintf(1, "DBENV->close: %s\n", db_strerror(ret));
286         }
287 }
288
289 /*
290  * Store a piece of data.  Returns 0 if the operation was successful.  If a
291  * key already exists it should be overwritten.
292  */
293 int cdb_store(int cdb,
294               void *ckey, int ckeylen,
295               void *cdata, int cdatalen)
296 {
297
298         DBT dkey, ddata;
299         DB_TXN *tid;
300         int ret;
301
302         memset(&dkey, 0, sizeof(DBT));
303         memset(&ddata, 0, sizeof(DBT));
304         dkey.size = ckeylen;
305         dkey.data = ckey;
306         ddata.size = cdatalen;
307         ddata.data = cdata;
308
309         if (MYTID != NULL) {
310                 ret = dbp[cdb]->put(dbp[cdb],           /* db */
311                                         MYTID,          /* transaction ID */
312                                         &dkey,          /* key */
313                                         &ddata,         /* data */
314                                         0);             /* flags */
315                 if (ret) {
316                         lprintf(1, "cdb_store(%d): %s\n", cdb,
317                                 db_strerror(ret));
318                 }
319                 return ret;
320         } else {
321             retry:
322                 if (txbegin(&tid))
323                         return -1;
324
325                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
326                                          tid,           /* transaction ID */
327                                          &dkey,         /* key */
328                                          &ddata,        /* data */
329                                          0))) {         /* flags */
330                         if (ret == DB_LOCK_DEADLOCK) {
331                                 if (txabort(tid))
332                                         return ret;
333                                 else
334                                         goto retry;
335                         } else {
336                                 lprintf(1, "cdb_store(%d): %s\n", cdb,
337                                         db_strerror(ret));
338                                 txabort(tid);
339                                 return ret;
340                         }
341                 } else {
342                         return txcommit(tid);
343                 }
344         }
345 }
346
347
348 /*
349  * Delete a piece of data.  Returns 0 if the operation was successful.
350  */
351 int cdb_delete(int cdb, void *key, int keylen)
352 {
353
354         DBT dkey;
355         DB_TXN *tid;
356         int ret;
357
358         dkey.size = keylen;
359         dkey.data = key;
360
361         if (MYTID != NULL) {
362                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
363                 return (ret);
364         } else {
365             retry:
366                 if (txbegin(&tid))
367                         return -1;
368
369                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) {
370                         if (ret == DB_LOCK_DEADLOCK) {
371                                         if (txabort(tid))
372                                                 return ret;
373                                         else
374                                                 goto retry;
375                         } else {
376                                 lprintf(1, "cdb_store(%d): %s\n", cdb,
377                                         db_strerror(ret));
378                                 txabort(tid);
379                                 return ret;
380                         }
381                 } else {
382                         return txcommit(tid);
383                 }
384         }
385 }
386
387
388
389
390 /*
391  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
392  * a struct cdbdata which it is the caller's responsibility to free later on
393  * using the cdb_free() routine.
394  */
395 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
396 {
397
398         struct cdbdata *tempcdb;
399         DBT dkey, dret;
400         DB_TXN *tid;
401         int ret;
402
403         memset(&dkey, 0, sizeof(DBT));
404         memset(&dret, 0, sizeof(DBT));
405         dkey.size = keylen;
406         dkey.data = key;
407         dret.flags = DB_DBT_MALLOC;
408
409         if (MYTID != NULL) {
410                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
411         } else {
412             retry:
413                 if (txbegin(&tid))
414                         return NULL;
415
416                 ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
417
418                 if (ret == DB_LOCK_DEADLOCK) {
419                         if (txabort(tid))
420                                 return NULL;
421                         else
422                                 goto retry;
423                 } else if (txcommit(tid))
424                         return NULL;
425         }
426
427         if ((ret != 0) && (ret != DB_NOTFOUND)) {
428                 lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
429         }
430         if (ret != 0) return NULL;
431         tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
432         if (tempcdb == NULL) {
433                 lprintf(2, "Cannot allocate memory!\n");
434         }
435         tempcdb->len = dret.size;
436         tempcdb->ptr = dret.data;
437         return (tempcdb);
438 }
439
440
441 /*
442  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
443  * more complex stuff with other database managers in the future).
444  */
445 void cdb_free(struct cdbdata *cdb)
446 {
447         phree(cdb->ptr);
448         phree(cdb);
449 }
450
451
452 /* 
453  * Prepare for a sequential search of an entire database.
454  * (There is guaranteed to be no more than one traversal in
455  * progress per thread at any given time.)
456  */
457 void cdb_rewind(int cdb)
458 {
459         int ret = 0;
460
461         if (MYCURSOR != NULL)
462                 MYCURSOR->c_close(MYCURSOR);
463
464         if (MYTID == NULL) {
465                 lprintf(1, "ERROR: cursor use outside transaction\n");
466                 abort();
467         }
468
469         /*
470          * Now initialize the cursor
471          */
472         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
473         if (ret) {
474                 lprintf(1, "db_cursor: %s\n", db_strerror(ret));
475         }
476 }
477
478
479 /*
480  * Fetch the next item in a sequential search.  Returns a pointer to a 
481  * cdbdata structure, or NULL if we've hit the end.
482  */
483 struct cdbdata *cdb_next_item(int cdb)
484 {
485         DBT key, data;
486         struct cdbdata *cdbret;
487         int ret = 0;
488
489         /* Initialize the key/data pair so the flags aren't set. */
490         memset(&key, 0, sizeof(key));
491         memset(&data, 0, sizeof(data));
492         data.flags = DB_DBT_MALLOC;
493
494         ret = MYCURSOR->c_get(MYCURSOR,
495                 &key, &data, DB_NEXT);
496         
497         if (ret) {
498                 MYCURSOR->c_close(MYCURSOR);
499                 MYCURSOR = NULL;
500                 return NULL;            /* presumably, end of file */
501         }
502
503         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
504         cdbret->len = data.size;
505         cdbret->ptr = data.data;
506
507         return (cdbret);
508 }
509
510
511 /*
512  * Transaction-based stuff.  I'm writing this as I bake cookies...
513  */
514
515 void cdb_begin_transaction(void) {
516
517         if (MYTID != NULL) {    /* FIXME this slows it down, take it out */
518                 lprintf(1, "ERROR: opening a new transaction with one already open!\n");
519                 abort();
520         }
521         else {
522                 txbegin(&MYTID);
523         }
524 }
525
526 void cdb_end_transaction(void) {
527         if (MYCURSOR != NULL) {
528                 lprintf(1, "WARNING: cursor still open at transaction end\n");
529                 MYCURSOR->c_close(MYCURSOR);
530                 MYCURSOR = NULL;
531         }
532         if (MYTID == NULL) lprintf(1, "ERROR: txcommit(NULL) !!\n");
533         else txcommit(MYTID);
534         MYTID = NULL;
535 }
536