57005be2845f83ea9fc9edbae08eab55bc91ac91
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   0
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <db.h>
30 #include <pthread.h>
31 #include "citadel.h"
32 #include "server.h"
33 #include "citserver.h"
34 #include "database.h"
35 #include "sysdep_decls.h"
36 #include "dynloader.h"
37
38 DB *dbp[MAXCDB];                /* One DB handle for each Citadel database */
39 DB_ENV *dbenv;                  /* The DB environment (global) */
40
41 struct cdbtsd {                 /* Thread-specific DB stuff */
42         DB_TXN *tid;            /* Transaction handle */
43         DBC *cursor;            /* Cursor, for traversals... */
44 };
45
46 int num_ssd = 0;
47
48 static pthread_key_t tsdkey;
49
50 #define MYCURSOR        (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
51 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
52
53 /* just a little helper function */
54 static int txabort(DB_TXN *tid) {
55         int ret = txn_abort(tid);
56
57         if (ret) {
58                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
59                 abort();
60         }
61
62         return ret;
63 }
64
65 /* this one is even more helpful than the last. */
66 static int txcommit(DB_TXN *tid) {
67         int ret = txn_commit(tid, 0);
68
69         if (ret) {
70                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
71                 abort();
72         }
73
74         return ret;
75 }
76
77 /* are you sensing a pattern yet? */
78 static int txbegin(DB_TXN **tid) {
79         int ret = txn_begin(dbenv, NULL, tid, 0);
80
81         if (ret) {
82                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
83                 abort();
84         }
85
86         return ret;
87 }
88
89 static void release_handles(void *arg) {
90         if (arg != NULL) {
91                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
92
93                 if (tsd->cursor != NULL) {
94                         lprintf(1, "cdb_*: WARNING: cursor still in progress; "
95                                 "closing!\n");
96                         tsd->cursor->c_close(tsd->cursor);
97                 }
98
99                 if (tsd->tid != NULL) {
100                         lprintf(1, "cdb_*: ERROR: transaction still in progress; "
101                                 "aborting!\n");
102                         txabort(tsd->tid);
103                 }
104         }
105 }
106
107 static void dest_tsd(void *arg) {
108         if (arg != NULL) {
109                 release_handles(arg);
110                 phree(arg);
111         }
112 }
113
114 /*
115  * Ensure that we have a key for thread-specific data.  We don't
116  * put anything in here that Citadel cares about; this is just database
117  * related stuff like cursors and transactions.
118  *
119  * This should be called immediately after startup by any thread which wants
120  * to use database calls, except for whatever thread calls open_databases.
121  */
122 void cdb_allocate_tsd(void) {
123         struct cdbtsd *tsd = mallok(sizeof *tsd);
124
125         tsd->tid = NULL;
126         tsd->cursor = NULL;
127         pthread_setspecific(tsdkey, tsd);
128 }
129
130 void cdb_free_tsd(void) {
131         dest_tsd(pthread_getspecific(tsdkey));
132         pthread_setspecific(tsdkey, NULL);
133 }
134
135 void cdb_release_handles(void) {
136         release_handles(pthread_getspecific(tsdkey));
137 }
138
139
140 /*
141  * Reclaim unused space in the databases.  We need to do each one of
142  * these discretely, rather than in a loop.
143  *
144  * This is a stub function in the Sleepycat DB backend, because there is no
145  * such API call available.
146  */
147 void defrag_databases(void)
148 {
149         /* do nothing */
150 }
151
152
153
154 /*
155  * Request a checkpoint of the database.
156  */
157 static void cdb_checkpoint(void) {
158         int ret;
159
160         ret = txn_checkpoint(dbenv,
161                                 MAX_CHECKPOINT_KBYTES,
162                                 MAX_CHECKPOINT_MINUTES,
163                                 0);
164         if (ret) {
165                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
166                 abort();
167         }
168 }
169
170 /*
171  * Open the various databases we'll be using.  Any database which
172  * does not exist should be created.  Note that we don't need an S_DATABASE
173  * critical section here, because there aren't any active threads manipulating
174  * the database yet -- and besides, it causes problems on BSDI.
175  */
176 void open_databases(void)
177 {
178         int ret;
179         int i;
180         char dbfilename[SIZ];
181         u_int32_t flags = 0;
182
183         lprintf(9, "cdb_*: open_databases() starting\n");
184         /*
185          * Silently try to create the database subdirectory.  If it's
186          * already there, no problem.
187          */
188         system("exec mkdir data 2>/dev/null");
189
190         lprintf(9, "cdb_*: Setting up DB environment\n");
191         db_env_set_func_yield(sched_yield);
192         ret = db_env_create(&dbenv, 0);
193         if (ret) {
194                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
195                 exit(ret);
196         }
197         dbenv->set_errpfx(dbenv, "citserver");
198
199         /*
200          * We want to specify the shared memory buffer pool cachesize,
201          * but everything else is the default.
202          */
203         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
204         if (ret) {
205                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
206                 dbenv->close(dbenv, 0);
207                 exit(ret);
208         }
209
210         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
211                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
212                 dbenv->close(dbenv, 0);
213                 exit(ret);
214         }
215
216         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
217                 DB_INIT_LOCK|DB_THREAD;
218         ret = dbenv->open(dbenv, "./data", flags, 0);
219         if (ret) {
220                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
221                 dbenv->close(dbenv, 0);
222                 exit(ret);
223         }
224
225         lprintf(7, "cdb_*: Starting up DB\n");
226
227         for (i = 0; i < MAXCDB; ++i) {
228
229                 /* Create a database handle */
230                 ret = db_create(&dbp[i], dbenv, 0);
231                 if (ret) {
232                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
233                         exit(ret);
234                 }
235
236
237                 /* Arbitrary names for our tables -- we reference them by
238                  * number, so we don't have string names for them.
239                  */
240                 sprintf(dbfilename, "cdb.%02x", i);
241
242                 ret = dbp[i]->open(dbp[i],
243                                 dbfilename,
244                                 NULL,
245                                 DB_BTREE,
246                                 DB_CREATE|DB_THREAD,
247                                 0600);
248                 if (ret) {
249                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
250                         exit(ret);
251                 }
252         }
253
254         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
255                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
256                 exit(1);
257         }
258
259         cdb_allocate_tsd();
260         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
261         lprintf(9, "cdb_*: open_databases() finished\n");
262 }
263
264
265 /*
266  * Close all of the db database files we've opened.  This can be done
267  * in a loop, since it's just a bunch of closes.
268  */
269 void close_databases(void)
270 {
271         int a;
272         int ret;
273
274         cdb_free_tsd();
275
276         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
277                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
278                 abort();
279         }
280
281         for (a = 0; a < MAXCDB; ++a) {
282                 lprintf(7, "cdb_*: Closing database %d\n", a);
283                 ret = dbp[a]->close(dbp[a], 0);
284                 if (ret) {
285                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
286                         abort();
287                 }
288                 
289         }
290
291         /* Close the handle. */
292         ret = dbenv->close(dbenv, 0);
293         if (ret) {
294                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
295                 abort();
296         }
297 }
298
299 /*
300  * Store a piece of data.  Returns 0 if the operation was successful.  If a
301  * key already exists it should be overwritten.
302  */
303 int cdb_store(int cdb,
304               void *ckey, int ckeylen,
305               void *cdata, int cdatalen)
306 {
307
308         DBT dkey, ddata;
309         DB_TXN *tid;
310         int ret;
311
312         memset(&dkey, 0, sizeof(DBT));
313         memset(&ddata, 0, sizeof(DBT));
314         dkey.size = ckeylen;
315         dkey.data = ckey;
316         ddata.size = cdatalen;
317         ddata.data = cdata;
318
319         if (MYTID != NULL) {
320                 ret = dbp[cdb]->put(dbp[cdb],           /* db */
321                                         MYTID,          /* transaction ID */
322                                         &dkey,          /* key */
323                                         &ddata,         /* data */
324                                         0);             /* flags */
325                 if (ret) {
326                         lprintf(1, "cdb_store(%d): %s\n", cdb,
327                                 db_strerror(ret));
328                         abort();
329                 }
330                 return ret;
331         } else {
332             retry:
333                 if (txbegin(&tid))
334                         return -1;
335
336                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
337                                          tid,           /* transaction ID */
338                                          &dkey,         /* key */
339                                          &ddata,        /* data */
340                                          0))) {         /* flags */
341                         if (ret == DB_LOCK_DEADLOCK) {
342                                 if (txabort(tid))
343                                         return ret;
344                                 else
345                                         goto retry;
346                         } else {
347                                 lprintf(1, "cdb_store(%d): %s\n", cdb,
348                                         db_strerror(ret));
349                                 abort();
350                         }
351                 } else {
352                         return txcommit(tid);
353                 }
354         }
355 }
356
357
358 /*
359  * Delete a piece of data.  Returns 0 if the operation was successful.
360  */
361 int cdb_delete(int cdb, void *key, int keylen)
362 {
363
364         DBT dkey;
365         DB_TXN *tid;
366         int ret;
367
368         memset(&dkey, 0, sizeof dkey);
369         dkey.size = keylen;
370         dkey.data = key;
371
372         if (MYTID != NULL) {
373                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
374                 return (ret);
375         } else {
376             retry:
377                 if (txbegin(&tid))
378                         return -1;
379
380                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))) {
381                         if (ret == DB_LOCK_DEADLOCK) {
382                                         if (txabort(tid))
383                                                 return ret;
384                                         else
385                                                 goto retry;
386                         } else {
387                                 lprintf(1, "cdb_delete(%d): %s\n", cdb,
388                                         db_strerror(ret));
389                                 if (ret != DB_NOTFOUND) {
390                                         abort();
391                                 }
392                         }
393                 } else {
394                         return txcommit(tid);
395                 }
396         }
397         return(0);
398 }
399
400
401
402
403 /*
404  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
405  * a struct cdbdata which it is the caller's responsibility to free later on
406  * using the cdb_free() routine.
407  */
408 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
409 {
410
411         struct cdbdata *tempcdb;
412         DBT dkey, dret;
413         DB_TXN *tid;
414         int ret;
415
416         memset(&dkey, 0, sizeof(DBT));
417         memset(&dret, 0, sizeof(DBT));
418         dkey.size = keylen;
419         dkey.data = key;
420         dret.flags = DB_DBT_MALLOC;
421
422         if (MYTID != NULL) {
423                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
424         } else {
425             retry:
426                 if (txbegin(&tid))
427                         return NULL;
428
429                 ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
430
431                 if (ret == DB_LOCK_DEADLOCK) {
432                         if (txabort(tid))
433                                 return NULL;
434                         else
435                                 goto retry;
436                 } else if (txcommit(tid))
437                         return NULL;
438         }
439
440         if ((ret != 0) && (ret != DB_NOTFOUND)) {
441                 lprintf(1, "cdb_fetch: %s\n", db_strerror(ret));
442                 abort();
443         }
444         if (ret != 0) return NULL;
445         tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
446         if (tempcdb == NULL) {
447                 lprintf(2, "cdb_fetch: Cannot allocate memory!\n");
448                 abort();
449         }
450         tempcdb->len = dret.size;
451         tempcdb->ptr = dret.data;
452         return (tempcdb);
453 }
454
455
456 /*
457  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
458  * more complex stuff with other database managers in the future).
459  */
460 void cdb_free(struct cdbdata *cdb)
461 {
462         phree(cdb->ptr);
463         phree(cdb);
464 }
465
466
467 /* 
468  * Prepare for a sequential search of an entire database.
469  * (There is guaranteed to be no more than one traversal in
470  * progress per thread at any given time.)
471  */
472 void cdb_rewind(int cdb)
473 {
474         int ret = 0;
475
476         if (MYCURSOR != NULL)
477                 MYCURSOR->c_close(MYCURSOR);
478
479         if (MYTID == NULL) {
480                 lprintf(1, "cdb_rewind: ERROR: cursor use outside transaction\n");
481                 abort();
482         }
483
484         /*
485          * Now initialize the cursor
486          */
487         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
488         if (ret) {
489                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
490                 abort();
491         }
492 }
493
494
495 /*
496  * Fetch the next item in a sequential search.  Returns a pointer to a 
497  * cdbdata structure, or NULL if we've hit the end.
498  */
499 struct cdbdata *cdb_next_item(int cdb)
500 {
501         DBT key, data;
502         struct cdbdata *cdbret;
503         int ret = 0;
504
505         /* Initialize the key/data pair so the flags aren't set. */
506         memset(&key, 0, sizeof(key));
507         memset(&data, 0, sizeof(data));
508         data.flags = DB_DBT_MALLOC;
509
510         ret = MYCURSOR->c_get(MYCURSOR,
511                 &key, &data, DB_NEXT);
512         
513         if (ret) {
514                 MYCURSOR->c_close(MYCURSOR);
515                 MYCURSOR = NULL;
516                 return NULL;            /* presumably, end of file */
517         }
518
519         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
520         cdbret->len = data.size;
521         cdbret->ptr = data.data;
522
523         return (cdbret);
524 }
525
526
527 /*
528  * Transaction-based stuff.  I'm writing this as I bake cookies...
529  */
530
531 void cdb_begin_transaction(void) {
532
533         if (MYTID != NULL) {    /* FIXME this slows it down, take it out */
534                 lprintf(1, "cdb_begin_transaction: ERROR: opening a new transaction with one already open!\n");
535                 abort();
536         }
537         else {
538                 txbegin(&MYTID);
539         }
540 }
541
542 void cdb_end_transaction(void) {
543         if (MYCURSOR != NULL) {
544                 lprintf(1, "cdb_end_transaction: WARNING: cursor still open at transaction end\n");
545                 MYCURSOR->c_close(MYCURSOR);
546                 MYCURSOR = NULL;
547         }
548         if (MYTID == NULL) {
549                 lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
550                 abort();
551         } else
552                 txcommit(MYTID);
553
554         MYTID = NULL;
555 }
556