* database_sleepycat.c: small changes to log messages
[citadel.git] / citadel / database_sleepycat.c
1 /*
2  * $Id$
3  *
4  * Sleepycat (Berkeley) DB driver for Citadel/UX
5  *
6  */
7
8 /*****************************************************************************
9        Tunable configuration parameters for the Sleepycat DB back end
10  *****************************************************************************/
11
12 /* Citadel will checkpoint the db at the end of every session, but only if
13  * the specified number of kilobytes has been written, or if the specified
14  * number of minutes has passed, since the last checkpoint.
15  */
16 #define MAX_CHECKPOINT_KBYTES   0
17 #define MAX_CHECKPOINT_MINUTES  15
18
19 /*****************************************************************************/
20
21 #include "sysdep.h"
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <dirent.h>
31 #include <db.h>
32 #include <pthread.h>
33 #include "citadel.h"
34 #include "server.h"
35 #include "citserver.h"
36 #include "database.h"
37 #include "sysdep_decls.h"
38 #include "dynloader.h"
39
40 DB *dbp[MAXCDB];                /* One DB handle for each Citadel database */
41 DB_ENV *dbenv;                  /* The DB environment (global) */
42
43 struct cdbtsd {                 /* Thread-specific DB stuff */
44         DB_TXN *tid;            /* Transaction handle */
45         DBC *cursor;            /* Cursor, for traversals... */
46 };
47
48 int num_ssd = 0;
49
50 static pthread_key_t tsdkey;
51
52 #define MYCURSOR        (((struct cdbtsd*)pthread_getspecific(tsdkey))->cursor)
53 #define MYTID           (((struct cdbtsd*)pthread_getspecific(tsdkey))->tid)
54
55 /* just a little helper function */
56 static void txabort(DB_TXN *tid) {
57         int ret = txn_abort(tid);
58
59         if (ret) {
60                 lprintf(1, "cdb_*: txn_abort: %s\n", db_strerror(ret));
61                 abort();
62         }
63 }
64
65 /* this one is even more helpful than the last. */
66 static void txcommit(DB_TXN *tid) {
67         int ret = txn_commit(tid, 0);
68
69         if (ret) {
70                 lprintf(1, "cdb_*: txn_commit: %s\n", db_strerror(ret));
71                 abort();
72         }
73 }
74
75 /* are you sensing a pattern yet? */
76 static void txbegin(DB_TXN **tid) {
77         int ret = txn_begin(dbenv, NULL, tid, 0);
78
79         if (ret) {
80                 lprintf(1, "cdb_*: txn_begin: %s\n", db_strerror(ret));
81                 abort();
82         }
83 }
84
85 static void cclose(DBC *cursor) {
86         int ret;
87
88         if ((ret = cursor->c_close(cursor))) {
89                 lprintf(1, "cdb_*: c_close: %s\n", db_strerror(ret));
90                 abort();
91         }
92 }
93
94 static void check_handles(void *arg) {
95         if (arg != NULL) {
96                 struct cdbtsd *tsd = (struct cdbtsd *)arg;
97
98                 if (tsd->cursor != NULL) {
99                         lprintf(1, "cdb_*: cursor still in progress!");
100                         abort();
101                 }
102
103                 if (tsd->tid != NULL) {
104                         lprintf(1, "cdb_*: transaction still in progress!");
105                         abort();
106                 }
107         }
108 }
109
110 static void dest_tsd(void *arg) {
111         if (arg != NULL) {
112                 check_handles(arg);
113                 phree(arg);
114         }
115 }
116
117 /*
118  * Ensure that we have a key for thread-specific data.  We don't
119  * put anything in here that Citadel cares about; this is just database
120  * related stuff like cursors and transactions.
121  *
122  * This should be called immediately after startup by any thread which wants
123  * to use database calls, except for whatever thread calls open_databases.
124  */
125 void cdb_allocate_tsd(void) {
126         struct cdbtsd *tsd;
127
128         if (pthread_getspecific(tsdkey) != NULL)
129                 return;
130
131         tsd = mallok(sizeof *tsd);
132
133         tsd->tid = NULL;
134         tsd->cursor = NULL;
135         pthread_setspecific(tsdkey, tsd);
136 }
137
138 void cdb_free_tsd(void) {
139         dest_tsd(pthread_getspecific(tsdkey));
140         pthread_setspecific(tsdkey, NULL);
141 }
142
143 void cdb_check_handles(void) {
144         check_handles(pthread_getspecific(tsdkey));
145 }
146
147
148 /*
149  * Reclaim unused space in the databases.  We need to do each one of
150  * these discretely, rather than in a loop.
151  *
152  * This is a stub function in the Sleepycat DB backend, because there is no
153  * such API call available.
154  */
155 void defrag_databases(void)
156 {
157         /* do nothing */
158 }
159
160
161 /*
162  * Cull the database logs
163  */
164 void cdb_cull_logs(void) {
165         DIR *dp;
166         struct dirent *d;
167         char filename[SIZ];
168         struct stat statbuf;
169
170         lprintf(5, "Database log file cull started.\n");
171
172         dp = opendir("data");
173         if (dp == NULL) return;
174
175         while (d = readdir(dp), d != NULL) {
176                 if (!strncasecmp(d->d_name, "log.", 4)) {
177                         sprintf(filename, "./data/%s", d->d_name);
178                         stat(filename, &statbuf);
179                         if ((time(NULL) - statbuf.st_mtime) > 432000L) {
180                                 lprintf(5, "%s ... deleted\n", filename);
181                                 unlink(filename);
182                         }
183                         else {
184                                 lprintf(5, "%s ... kept\n", filename);
185                         }
186                 }
187         }
188
189         closedir(dp);
190
191         lprintf(5, "Database log file cull ended.\n");
192 }
193
194
195 /*
196  * Request a checkpoint of the database.
197  */
198 static void cdb_checkpoint(void) {
199         int ret;
200         static time_t last_cull = 0L;
201
202         ret = txn_checkpoint(dbenv,
203                                 MAX_CHECKPOINT_KBYTES,
204                                 MAX_CHECKPOINT_MINUTES,
205                                 0);
206         if (ret) {
207                 lprintf(1, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret));
208                 abort();
209         }
210
211
212         /* Cull the logs if we haven't done so for 24 hours */
213         if ((time(NULL) - last_cull) > 86400L) {
214                 last_cull = time(NULL);
215                 cdb_cull_logs();
216         }
217
218 }
219
220 /*
221  * Open the various databases we'll be using.  Any database which
222  * does not exist should be created.  Note that we don't need an S_DATABASE
223  * critical section here, because there aren't any active threads manipulating
224  * the database yet -- and besides, it causes problems on BSDI.
225  */
226 void open_databases(void)
227 {
228         int ret;
229         int i;
230         char dbfilename[SIZ];
231         u_int32_t flags = 0;
232
233         lprintf(9, "cdb_*: open_databases() starting\n");
234         /*
235          * Silently try to create the database subdirectory.  If it's
236          * already there, no problem.
237          */
238         system("exec mkdir data 2>/dev/null");
239
240         lprintf(9, "cdb_*: Setting up DB environment\n");
241         db_env_set_func_yield(sched_yield);
242         ret = db_env_create(&dbenv, 0);
243         if (ret) {
244                 lprintf(1, "cdb_*: db_env_create: %s\n", db_strerror(ret));
245                 exit(ret);
246         }
247         dbenv->set_errpfx(dbenv, "citserver");
248
249         /*
250          * We want to specify the shared memory buffer pool cachesize,
251          * but everything else is the default.
252          */
253         ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
254         if (ret) {
255                 lprintf(1, "cdb_*: set_cachesize: %s\n", db_strerror(ret));
256                 dbenv->close(dbenv, 0);
257                 exit(ret);
258         }
259
260         if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
261                 lprintf(1, "cdb_*: set_lk_detect: %s\n", db_strerror(ret));
262                 dbenv->close(dbenv, 0);
263                 exit(ret);
264         }
265
266         flags = DB_CREATE|DB_RECOVER|DB_INIT_MPOOL|DB_PRIVATE|DB_INIT_TXN|
267                 DB_INIT_LOCK|DB_THREAD;
268         ret = dbenv->open(dbenv, "./data", flags, 0);
269         if (ret) {
270                 lprintf(1, "cdb_*: dbenv->open: %s\n", db_strerror(ret));
271                 dbenv->close(dbenv, 0);
272                 exit(ret);
273         }
274
275         lprintf(7, "cdb_*: Starting up DB\n");
276
277         for (i = 0; i < MAXCDB; ++i) {
278
279                 /* Create a database handle */
280                 ret = db_create(&dbp[i], dbenv, 0);
281                 if (ret) {
282                         lprintf(1, "cdb_*: db_create: %s\n", db_strerror(ret));
283                         exit(ret);
284                 }
285
286
287                 /* Arbitrary names for our tables -- we reference them by
288                  * number, so we don't have string names for them.
289                  */
290                 sprintf(dbfilename, "cdb.%02x", i);
291
292                 ret = dbp[i]->open(dbp[i],
293                                 dbfilename,
294                                 NULL,
295                                 DB_BTREE,
296                                 DB_CREATE|DB_THREAD,
297                                 0600);
298                 if (ret) {
299                         lprintf(1, "cdb_*: db_open[%d]: %s\n", i, db_strerror(ret));
300                         exit(ret);
301                 }
302         }
303
304         if ((ret = pthread_key_create(&tsdkey, dest_tsd))) {
305                 lprintf(1, "cdb_*: pthread_key_create: %s\n", strerror(ret));
306                 exit(1);
307         }
308
309         cdb_allocate_tsd();
310         CtdlRegisterSessionHook(cdb_checkpoint, EVT_TIMER);
311         lprintf(9, "cdb_*: open_databases() finished\n");
312 }
313
314
315 /*
316  * Close all of the db database files we've opened.  This can be done
317  * in a loop, since it's just a bunch of closes.
318  */
319 void close_databases(void)
320 {
321         int a;
322         int ret;
323
324         cdb_free_tsd();
325
326         if ((ret = txn_checkpoint(dbenv, 0, 0, 0))) {
327                 lprintf(1, "cdb_*: txn_checkpoint: %s\n", db_strerror(ret));
328                 abort();
329         }
330
331         for (a = 0; a < MAXCDB; ++a) {
332                 lprintf(7, "cdb_*: Closing database %d\n", a);
333                 ret = dbp[a]->close(dbp[a], 0);
334                 if (ret) {
335                         lprintf(1, "cdb_*: db_close: %s\n", db_strerror(ret));
336                         abort();
337                 }
338                 
339         }
340
341         /* Close the handle. */
342         ret = dbenv->close(dbenv, 0);
343         if (ret) {
344                 lprintf(1, "cdb_*: DBENV->close: %s\n", db_strerror(ret));
345                 abort();
346         }
347 }
348
349 /*
350  * Store a piece of data.  Returns 0 if the operation was successful.  If a
351  * key already exists it should be overwritten.
352  */
353 int cdb_store(int cdb,
354               void *ckey, int ckeylen,
355               void *cdata, int cdatalen)
356 {
357
358         DBT dkey, ddata;
359         DB_TXN *tid;
360         int ret;
361
362         memset(&dkey, 0, sizeof(DBT));
363         memset(&ddata, 0, sizeof(DBT));
364         dkey.size = ckeylen;
365         dkey.data = ckey;
366         ddata.size = cdatalen;
367         ddata.data = cdata;
368
369         if (MYTID != NULL) {
370                 ret = dbp[cdb]->put(dbp[cdb],           /* db */
371                                         MYTID,          /* transaction ID */
372                                         &dkey,          /* key */
373                                         &ddata,         /* data */
374                                         0);             /* flags */
375                 if (ret) {
376                         lprintf(1, "cdb_store(%d): %s\n", cdb,
377                                 db_strerror(ret));
378                         abort();
379                 }
380                 return ret;
381         } else {
382             retry:
383                 txbegin(&tid);
384
385                 if ((ret = dbp[cdb]->put(dbp[cdb],      /* db */
386                                          tid,           /* transaction ID */
387                                          &dkey,         /* key */
388                                          &ddata,        /* data */
389                                          0))) {         /* flags */
390                         if (ret == DB_LOCK_DEADLOCK) {
391                                 txabort(tid);
392                                 goto retry;
393                         } else {
394                                 lprintf(1, "cdb_store(%d): %s\n", cdb,
395                                         db_strerror(ret));
396                                 abort();
397                         }
398                 } else {
399                         txcommit(tid);
400                         return ret;
401                 }
402         }
403 }
404
405
406 /*
407  * Delete a piece of data.  Returns 0 if the operation was successful.
408  */
409 int cdb_delete(int cdb, void *key, int keylen)
410 {
411
412         DBT dkey;
413         DB_TXN *tid;
414         int ret;
415
416         memset(&dkey, 0, sizeof dkey);
417         dkey.size = keylen;
418         dkey.data = key;
419
420         if (MYTID != NULL) {
421                 ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0);
422                 if (ret) {
423                         lprintf(1, "cdb_delete(%d): %s\n", cdb,
424                                 db_strerror(ret));
425                         if (ret != DB_NOTFOUND)
426                                 abort();
427                 }
428         } else {
429             retry:
430                 txbegin(&tid);
431
432                 if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
433                     && ret != DB_NOTFOUND) {
434                         if (ret == DB_LOCK_DEADLOCK) {
435                                         txabort(tid);
436                                         goto retry;
437                         } else {
438                                 lprintf(1, "cdb_delete(%d): %s\n", cdb,
439                                         db_strerror(ret));
440                                 abort();
441                         }
442                 } else {
443                         txcommit(tid);
444                 }
445         }
446         return ret;
447 }
448
449
450
451
452 /*
453  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
454  * a struct cdbdata which it is the caller's responsibility to free later on
455  * using the cdb_free() routine.
456  */
457 struct cdbdata *cdb_fetch(int cdb, void *key, int keylen)
458 {
459
460         struct cdbdata *tempcdb;
461         DBT dkey, dret;
462         DB_TXN *tid;
463         int ret;
464
465         memset(&dkey, 0, sizeof(DBT));
466         dkey.size = keylen;
467         dkey.data = key;
468
469         if (MYTID != NULL) {
470                 memset(&dret, 0, sizeof(DBT));
471                 dret.flags = DB_DBT_MALLOC;
472                 ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0);
473         } else {
474             retry:
475                 memset(&dret, 0, sizeof(DBT));
476                 dret.flags = DB_DBT_MALLOC;
477
478                 txbegin(&tid);
479
480                 ret = dbp[cdb]->get(dbp[cdb], tid, &dkey, &dret, 0);
481
482                 if (ret == DB_LOCK_DEADLOCK) {
483                         txabort(tid);
484                         goto retry;
485                 }
486                 if (ret && ret != DB_NOTFOUND)
487                         abort();
488
489                 txcommit(tid);
490         }
491
492         if ((ret != 0) && (ret != DB_NOTFOUND)) {
493                 lprintf(1, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
494                 abort();
495         }
496         if (ret != 0) return NULL;
497         tempcdb = (struct cdbdata *) mallok(sizeof(struct cdbdata));
498         if (tempcdb == NULL) {
499                 lprintf(2, "cdb_fetch: Cannot allocate memory for tempcdb\n");
500                 abort();
501         }
502         tempcdb->len = dret.size;
503         tempcdb->ptr = dret.data;
504         return (tempcdb);
505 }
506
507
508 /*
509  * Free a cdbdata item (ok, this is really no big deal, but we might need to do
510  * more complex stuff with other database managers in the future).
511  */
512 void cdb_free(struct cdbdata *cdb)
513 {
514         phree(cdb->ptr);
515         phree(cdb);
516 }
517
518
519 /* 
520  * Prepare for a sequential search of an entire database.
521  * (There is guaranteed to be no more than one traversal in
522  * progress per thread at any given time.)
523  */
524 void cdb_rewind(int cdb)
525 {
526         int ret = 0;
527
528         if (MYCURSOR != NULL)
529                 cclose(MYCURSOR);
530
531         if (MYTID == NULL) {
532                 lprintf(1, "cdb_rewind: ERROR: cursor use outside transaction\n");
533                 abort();
534         }
535
536         /*
537          * Now initialize the cursor
538          */
539         ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSOR, 0);
540         if (ret) {
541                 lprintf(1, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
542                 abort();
543         }
544 }
545
546
547 /*
548  * Fetch the next item in a sequential search.  Returns a pointer to a 
549  * cdbdata structure, or NULL if we've hit the end.
550  */
551 struct cdbdata *cdb_next_item(int cdb)
552 {
553         DBT key, data;
554         struct cdbdata *cdbret;
555         int ret = 0;
556
557         /* Initialize the key/data pair so the flags aren't set. */
558         memset(&key, 0, sizeof(key));
559         memset(&data, 0, sizeof(data));
560         data.flags = DB_DBT_MALLOC;
561
562         ret = MYCURSOR->c_get(MYCURSOR,
563                 &key, &data, DB_NEXT);
564         
565         if (ret) {
566                 if (ret != DB_NOTFOUND) {
567                         lprintf(1, "cdb_next_item(%d): %s\n",
568                                 cdb, db_strerror(ret));
569                         abort();
570                 }
571                 cclose(MYCURSOR);
572                 MYCURSOR = NULL;
573                 return NULL;            /* presumably, end of file */
574         }
575
576         cdbret = (struct cdbdata *) mallok(sizeof(struct cdbdata));
577         cdbret->len = data.size;
578         cdbret->ptr = data.data;
579
580         return (cdbret);
581 }
582
583
584 /*
585  * Transaction-based stuff.  I'm writing this as I bake cookies...
586  */
587
588 void cdb_begin_transaction(void) {
589
590         /******** this check slows it down and is not needed except to debug
591         if (MYTID != NULL) {
592                 lprintf(1, "cdb_begin_transaction: ERROR: opening a new transaction with one already open!\n");
593                 abort();
594         }
595         ***************************/
596
597         txbegin(&MYTID);
598 }
599
600 void cdb_end_transaction(void) {
601         if (MYCURSOR != NULL) {
602                 lprintf(1, "cdb_end_transaction: WARNING: cursor still open at transaction end\n");
603                 cclose(MYCURSOR);
604                 MYCURSOR = NULL;
605         }
606         if (MYTID == NULL) {
607                 lprintf(1, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
608                 abort();
609         } else
610                 txcommit(MYTID);
611
612         MYTID = NULL;
613 }
614