- added a new function to the database interface, cdb_close_cursor(). always
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   0
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31 #include <db.h>
32 #include <pthread.h>
33 #include "citadel.h"
34 #include "server.h"
35 #include "citserver.h"
36 #include "database.h"
37 #include "sysdep_decls.h"
38 #include "dynloader.h"
39
40 static DB *dbp[MAXCDB];         /* One DB handle for each Citadel database */
41 static DB_ENV *dbenv;           /* The DB environment (global) */
42
43 struct cdbtsd {                 /* Thread-specific DB stuff */
44         DB_TXN *tid;            /* Transaction handle */
45         DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
46 };
47
48 static pthread_key_t tsdkey;
49
50 #define MYCURSORS       (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursors)
51 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
52
53 /* just a little helper function */
54 static void txabort(DB_TXN *tid) {
55         int ret = txn_abort(tid);
56
57         if (ret) {
58                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
59                 abort();
60         }
61 }
62
63 /* this one is even more helpful than the last. */
64 static void txcommit(DB_TXN *tid) {
65         int ret = txn_commit(tid, 0);
66
67         if (ret) {
68                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
69                 abort();
70         }
71 }
72
73 /* are you sensing a pattern yet? */
74 static void txbegin(DB_TXN **tid) {
75         int ret = txn_begin(dbenv, NULL, tid, 0);
76
77         if (ret) {
78                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
79                 abort();
80         }
81 }
82
83 static void cclose(DBC *cursor) {
84         int ret;
85
86         if ((ret = cursor->c_close(cursor))) {
87                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
88                 abort();
89         }
90 }
91
92 static void bailIfCursor(DBC **cursors, const char *msg)
93 {
94   int i;
95
96   for (i = 0; i < MAXCDB; i++)
97     if (cursors[i] != NULL)
98       {
99         lprintf(1, "cdb_*: cursor still in progress on cdb %d: %s\n", i, msg);
100         abort();
101       }
102 }
103
104 static void check_handles(void *arg) {
105         if (arg != NULL) {
106                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
107
108                 bailIfCursor(tsd->cursors, "in check_handles");
109
110                 if (tsd->tid != NULL) {
111                         lprintf(1, "cdb_*: transaction still in progress!");
112                         abort();
113                 }
114         }
115 }
116
117 static void dest_tsd(void *arg) {
118         if (arg != NULL) {
119                 check_handles(arg);
120                 phree(arg);
121         }
122 }
123
124 /*
125  * Ensure that we have a key for thread-specific data.  We don't
126  * put anything in here that Citadel cares about; this is just database
127  * related stuff like cursors and transactions.
128  *
129  * This should be called immediately after startup by any thread which wants
130  * to use database calls, except for whatever thread calls open_databases.
131  */
132 void cdb_allocate_tsd(void) {
133         struct cdbtsd *tsd;
134
135         if (pthread_getspecific(tsdkey) != NULL)
136                 return;
137
138         tsd = mallok(sizeof *tsd);
139
140         tsd->tid = NULL;
141
142         memset(tsd->cursors, 0, sizeof tsd->cursors);
143         pthread_setspecific(tsdkey, tsd);
144 }
145
146 void cdb_free_tsd(void) {
147         dest_tsd(pthread_getspecific(tsdkey));
148         pthread_setspecific(tsdkey, NULL);
149 }
150
151 void cdb_check_handles(void) {
152         check_handles(pthread_getspecific(tsdkey));
153 }
154
155
156 /*
157  * Reclaim unused space in the databases.  We need to do each one of
158  * these discretely, rather than in a loop.
159  *
160  * This is a stub function in the Sleepycat DB backend, because there is no
161  * such API call available.
162  */
163 void defrag_databases(void)
164 {
165         /* do nothing */
166 }
167
168
169 /*
170  * Cull the database logs
171  */
172 static void cdb_cull_logs(void) {
173         DIR *dp;
174         struct dirent *d;
175         char filename[SIZ];
176         struct stat statbuf;
177
178         lprintf(5, "Database log file cull started.\n");
179
180         dp = opendir("data");
181         if (dp == NULL) return;
182
183         while (d = readdir(dp), d != NULL) {
184                 if (!strncasecmp(d->d_name, "log.", 4)) {
185                         sprintf(filename, "./data/%s", d->d_name);
186                         stat(filename, &statbuf);
187                         if ((time(NULL) - statbuf.st_mtime) > 432000L) {
188                                 lprintf(5, "%s ... deleted\n", filename);
189                                 unlink(filename);
190                         }
191                         else {
192                                 lprintf(5, "%s ... kept\n", filename);
193                         }
194                 }
195         }
196
197         closedir(dp);
198
199         lprintf(5, "Database log file cull ended.\n");
200 }
201
202
203 /*
204  * Request a checkpoint of the database.
205  */
206 static void cdb_checkpoint(void) {
207         int ret;
208         static time_t last_cull = 0L;
209
210         ret = txn_checkpoint(dbenv,
211                                 MAX_CHECKPOINT_KBYTES,
212                                 MAX_CHECKPOINT_MINUTES,
213                                 0);
214         if (ret) {
215                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
216                 abort();
217         }
218
219
220         /* Cull the logs if we haven't done so for 24 hours */
221         if ((time(NULL) - last_cull) > 86400L) {
222                 last_cull = time(NULL);
223                 cdb_cull_logs();
224         }
225
226 }
227
228 /*
229  * Open the various databases we'll be using.  Any database which
230  * does not exist should be created.  Note that we don't need an S_DATABASE
231  * critical section here, because there aren't any active threads manipulating
232  * the database yet -- and besides, it causes problems on BSDI.
233  */
234 void open_databases(void)
235 {
236         int ret;
237         int i;
238         char dbfilename[SIZ];
239         u_int32_t flags = 0;
240
241         lprintf(9, "cdb_*: open_databases() starting\n");
242         /*
243          * Silently try to create the database subdirectory.  If it's
244          * already there, no problem.
245          */
246         system("exec mkdir data 2>/dev/null");
247
248         lprintf(9, "cdb_*: Setting up DB environment\n");
249         db_env_set_func_yield(sched_yield);
250         ret = db_env_create(&dbenv, 0);
251         if (ret) {
252                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
253                 exit(ret);
254         }
255         dbenv->set_errpfx(dbenv, "citserver");
256
257         /*
258          * We want to specify the shared memory buffer pool cachesize,
259          * but everything else is the default.
260          */
261         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
262         if (ret) {
263                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
264                 dbenv->close(dbenv, 0);
265                 exit(ret);
266         }
267
268         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
269                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
270                 dbenv->close(dbenv, 0);
271                 exit(ret);
272         }
273
274         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
275                 DB_INIT_LOCK|DB_THREAD;
276         ret = dbenv->open(dbenv, "./data", flags, 0);
277         if (ret) {
278                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
279                 dbenv->close(dbenv, 0);
280                 exit(ret);
281         }
282
283         lprintf(7, "cdb_*: Starting up DB\n");
284
285         for (i = 0; i < MAXCDB; ++i) {
286
287                 /* Create a database handle */
288                 ret = db_create(&dbp[i], dbenv, 0);
289                 if (ret) {
290                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
291                         exit(ret);
292                 }
293
294
295                 /* Arbitrary names for our tables -- we reference them by
296                  * number, so we don't have string names for them.
297                  */
298                 sprintf(dbfilename, "cdb.%02x", i);
299
300                 ret = dbp[i]->open(dbp[i],
301                                 dbfilename,
302                                 NULL,
303                                 DB_BTREE,
304                                 DB_CREATE|DB_THREAD,
305                                 0600);
306                 if (ret) {
307                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
308                         exit(ret);
309                 }
310         }
311
312         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
313                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
314                 exit(1);
315         }
316
317         cdb_allocate_tsd();
318         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
319         lprintf(9, "cdb_*: open_databases() finished\n");
320 }
321
322
323 /*
324  * Close all of the db database files we've opened.  This can be done
325  * in a loop, since it's just a bunch of closes.
326  */
327 void close_databases(void)
328 {
329         int a;
330         int ret;
331
332         cdb_free_tsd();
333
334         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
335                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
336                 abort();
337         }
338
339         for (a = 0; a < MAXCDB; ++a) {
340                 lprintf(7, "cdb_*: Closing database %d\n", a);
341                 ret = dbp[a]->close(dbp[a], 0);
342                 if (ret) {
343                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
344                         abort();
345                 }
346                 
347         }
348
349         /* Close the handle. */
350         ret = dbenv->close(dbenv, 0);
351         if (ret) {
352                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
353                 abort();
354         }
355 }
356
357 /*
358  * Store a piece of data.  Returns 0 if the operation was successful.  If a
359  * key already exists it should be overwritten.
360  */
361 int cdb_store(int cdb,
362               void *ckey, int ckeylen,
363               void *cdata, int cdatalen)
364 {
365
366   DBT dkey, ddata;
367   DB_TXN *tid;
368   int ret;
369   
370   memset(&dkey, 0, sizeof(DBT));
371   memset(&ddata, 0, sizeof(DBT));
372   dkey.size = ckeylen;
373   dkey.data = ckey;
374   ddata.size = cdatalen;
375   ddata.data = cdata;
376   
377   if (MYTID != NULL)
378     {
379       ret = dbp[cdb]->put(dbp[cdb],             /* db */
380                           MYTID,                /* transaction ID */
381                           &dkey,                /* key */
382                           &ddata,               /* data */
383                           0);           /* flags */
384       if (ret)
385         {
386           lprintf(1, "cdb_store(%d): %s\n", cdb,
387                   db_strerror(ret));
388           abort();
389         }
390       return ret;
391       
392     }
393   else
394     {
395       bailIfCursor(MYCURSORS, "attempt to write during r/o cursor");
396       
397     retry:
398       txbegin(&tid);
399       
400       if ((ret = dbp[cdb]->put(dbp[cdb],    /* db */
401                                tid,         /* transaction ID */
402                                &dkey,       /* key */
403                                &ddata,      /* data */
404                                0)))         /* flags */
405         {
406           if (ret == DB_LOCK_DEADLOCK)
407             {
408               txabort(tid);
409               goto retry;
410             }
411           else
412             {
413               lprintf(1, "cdb_store(%d): %s\n", cdb,
414                       db_strerror(ret));
415               abort();
416             }
417         }
418       else
419         {
420           txcommit(tid);
421           return ret;
422         }
423     }
424 }
425
426
427 /*
428  * Delete a piece of data.  Returns 0 if the operation was successful.
429  */
430 int cdb_delete(int cdb, void *key, int keylen)
431 {
432
433   DBT dkey;
434   DB_TXN *tid;
435   int ret;
436
437   memset(&dkey, 0, sizeof dkey);
438   dkey.size = keylen;
439   dkey.data = key;
440
441   if (MYTID != NULL)
442     {
443       ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
444       if (ret)
445         {
446           lprintf(1, "cdb_delete(%d): %s\n", cdb,
447                   db_strerror(ret));
448           if (ret != DB_NOTFOUND)
449             abort();
450         }
451     }
452   else
453     {
454       bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor");
455     
456     retry:
457       txbegin(&tid);
458     
459       if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
460           && ret != DB_NOTFOUND)
461         {
462           if (ret == DB_LOCK_DEADLOCK)
463             {
464               txabort(tid);
465               goto retry;
466             }
467           else
468             {
469               lprintf(1, "cdb_delete(%d): %s\n", cdb,
470                       db_strerror(ret));
471               abort();
472             }
473         }
474       else
475         {
476           txcommit(tid);
477         }
478     }
479   return ret;
480 }
481
482 static DBC *localcursor(int cdb)
483 {
484   int ret;
485   DBC *curs;
486
487   if (MYCURSORS[cdb] == NULL)
488     ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0);
489   else
490     ret = MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, DB_POSITION);
491
492   if (ret)
493     {
494       lprintf(1, "localcursor: %s\n", db_strerror(ret));
495       abort();
496     }
497
498   return curs;
499 }
500
501
502 /*
503  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
504  * a struct cdbdata which it is the caller's responsibility to free later on
505  * using the cdb_free() routine.
506  */
507 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
508 {
509
510   struct cdbdata *tempcdb;
511   DBT dkey, dret;
512   int ret;
513
514   memset(&dkey, 0, sizeof(DBT));
515   dkey.size = keylen;
516   dkey.data = key;
517
518   if (MYTID != NULL)
519     {
520       memset(&dret, 0, sizeof(DBT));
521       dret.flags = DB_DBT_MALLOC;
522       ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
523     }
524   else
525     {
526       DBC *curs;
527
528       do
529         {
530           memset(&dret, 0, sizeof(DBT));
531           dret.flags = DB_DBT_MALLOC;
532
533           curs = localcursor(cdb);
534
535           ret = curs->c_get(curs, &dkey, &dret, DB_SET);
536           cclose(curs);
537         }
538       while (ret == DB_LOCK_DEADLOCK);
539
540     }
541
542   if ((ret != 0) && (ret != DB_NOTFOUND))
543     {
544       lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
545       abort();
546     }
547
548   if (ret != 0) return NULL;
549   tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
550
551   if (tempcdb == NULL)
552     {
553       lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
554       abort();
555     }
556
557   tempcdb->len = dret.size;
558   tempcdb->ptr = dret.data;
559   return (tempcdb);
560 }
561
562
563 /*
564  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
565  * more complex stuff with other database managers in the future).
566  */
567 void cdb_free(struct cdbdata *cdb)
568 {
569         phree(cdb->ptr);
570         phree(cdb);
571 }
572
573 void cdb_close_cursor(int cdb)
574 {
575         if (MYCURSORS[cdb] != NULL)
576                 cclose(MYCURSORS[cdb]);
577
578         MYCURSORS[cdb] = NULL;
579 }
580
581 /* 
582  * Prepare for a sequential search of an entire database.
583  * (There is guaranteed to be no more than one traversal in
584  * progress per thread at any given time.)
585  */
586 void cdb_rewind(int cdb)
587 {
588         int ret = 0;
589
590         if (MYCURSORS[cdb] != NULL)
591                 cclose(MYCURSORS[cdb]);
592
593         /*
594          * Now initialize the cursor
595          */
596         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0);
597         if (ret) {
598                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
599                 abort();
600         }
601 }
602
603
604 /*
605  * Fetch the next item in a sequential search.  Returns a pointer to a 
606  * cdbdata structure, or NULL if we've hit the end.
607  */
608 struct cdbdata *cdb_next_item(int cdb)
609 {
610         DBT key, data;
611         struct cdbdata *cdbret;
612         int ret = 0;
613
614         /* Initialize the key/data pair so the flags aren't set. */
615         memset(&key, 0, sizeof(key));
616         memset(&data, 0, sizeof(data));
617         data.flags = DB_DBT_MALLOC;
618
619         ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb],
620                 &key, &data, DB_NEXT);
621         
622         if (ret) {
623                 if (ret != DB_NOTFOUND) {
624                         lprintf(1, "cdb_next_item(%d): %s\n",
625                                 cdb, db_strerror(ret));
626                         abort();
627                 }
628                 cclose(MYCURSORS[cdb]);
629                 MYCURSORS[cdb] = NULL;
630                 return NULL;            /* presumably, end of file */
631         }
632
633         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
634         cdbret->len = data.size;
635         cdbret->ptr = data.data;
636
637         return (cdbret);
638 }
639
640
641 /*
642  * Transaction-based stuff.  I'm writing this as I bake cookies...
643  */
644
645 void cdb_begin_transaction(void) {
646
647   bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor");
648
649   if (MYTID != NULL)
650     {
651       lprintf(1, "cdb_begin_transaction: ERROR: nested transaction\n");
652       abort();
653     }
654
655   txbegin(&MYTID);
656 }
657
658 void cdb_end_transaction(void) {
659   int i;
660
661   for (i = 0; i < MAXCDB; i++)
662     if (MYCURSORS[i] != NULL) {
663       lprintf(1, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i);
664       cclose(MYCURSORS[i]);
665       MYCURSORS[i] = NULL;
666     }
667
668   if (MYTID == NULL)
669     {
670       lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
671       abort();
672     }
673   else
674     txcommit(MYTID);
675
676   MYTID = NULL;
677 }
678