From 3291d029e1d490e5cf9d5518630d78516752eb35 Mon Sep 17 00:00:00 2001 From: Art Cancro Date: Tue, 9 Jan 2001 06:05:10 +0000 Subject: [PATCH] * more merges --- citadel/database.h | 4 + citadel/database_sleepycat.c | 2 +- citadel/server.h | 1 + citadel/sysdep.c | 158 ++++++++++++++++++++++++++--------- citadel/sysdep_decls.h | 3 +- 5 files changed, 128 insertions(+), 40 deletions(-) diff --git a/citadel/database.h b/citadel/database.h index 399f2013a..3df6edf62 100644 --- a/citadel/database.h +++ b/citadel/database.h @@ -10,3 +10,7 @@ void cdb_rewind (int cdb); struct cdbdata *cdb_next_item (int cdb); void cdb_begin_transaction(void); void cdb_end_transaction(void); +void cdb_allocate_tsd(void); +void cdb_free_tsd(void); +void cdb_release_handles(void); + diff --git a/citadel/database_sleepycat.c b/citadel/database_sleepycat.c index d5c0c4e49..82d886e89 100644 --- a/citadel/database_sleepycat.c +++ b/citadel/database_sleepycat.c @@ -170,7 +170,7 @@ void open_databases(void) { int ret; int i; - char dbfilename[256]; + char dbfilename[SIZ]; u_int32_t flags = 0; lprintf(9, "open_databases() starting\n"); diff --git a/citadel/server.h b/citadel/server.h index 55d81c747..bfb95ccba 100644 --- a/citadel/server.h +++ b/citadel/server.h @@ -174,6 +174,7 @@ enum { S_SUPPMSGMAIN, S_I_WANNA_SELECT, S_CONFIG, + S_WORKER_LIST, MAX_SEMAPHORES }; diff --git a/citadel/sysdep.c b/citadel/sysdep.c index 107d40df5..74a71d63b 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -72,7 +72,7 @@ int verbosity = DEFAULT_VERBOSITY; /* Logging level */ struct CitContext masterCC; int rescan[2]; /* The Rescan Pipe */ time_t last_purge = 0; /* Last dead session purge */ -int num_threads = 0; /* Current number of threads */ +static int num_threads = 0; /* Current number of threads */ int num_sessions = 0; /* Current number of sessions */ fd_set masterfds; /* Master sockets etc. */ @@ -80,6 +80,8 @@ int masterhighest; time_t last_timer = 0L; /* Last timer hook processing */ +static pthread_t initial_thread; /* tid for main() thread */ + /* * lprintf() ... Write logging information @@ -194,7 +196,7 @@ void dump_tracked() { * that it's time to call master_cleanup() and exit. */ -static volatile int time_to_die = 0; +volatile int time_to_die = 0; static RETSIGTYPE signal_cleanup(int signum) { time_to_die = 1; @@ -712,6 +714,37 @@ int convert_login(char NameToConvert[]) { } } +static struct worker_node { + pthread_t tid; + struct worker_node *next; +} *worker_list = NULL; + + +/* + * create a worker thread. this function must always be called from within + * an S_WORKER_LIST critical section! + */ +static void create_worker(void) { + int ret; + struct worker_node *n = mallok(sizeof *n); + + if (n == NULL) { + lprintf(1, "can't allocate worker_node, exiting\n"); + time_to_die = -1; + return; + } + + if ((ret = pthread_create(&n->tid, NULL, worker_thread, NULL) != 0)) + { + + lprintf(1, "Can't create worker thread: %s\n", + strerror(ret)); + } + + n->next = worker_list; + worker_list = n; +} + /* @@ -725,8 +758,8 @@ int convert_login(char NameToConvert[]) { */ void dead_session_purge(void) { struct CitContext *ptr, *rem; - pthread_attr_t attr; - pthread_t newthread; + struct worker_node **node, *tmp; + pthread_t self; if ( (time(NULL) - last_purge) < 5 ) return; /* Too soon, go away */ time(&last_purge); @@ -756,22 +789,38 @@ void dead_session_purge(void) { * an action is appropriate. */ + self = pthread_self(); + if ( (num_sessions > num_threads) && (num_threads < config.c_max_workers) ) { - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&newthread, &attr, - (void* (*)(void*)) worker_thread, NULL) != 0) { - lprintf(1, "Can't create worker thead: %s\n", - strerror(errno)); - } - + begin_critical_section(S_WORKER_LIST); + create_worker(); + end_critical_section(S_WORKER_LIST); } + /* don't let the initial thread die since it's responsible for + waiting for all the other threads to terminate. */ else if ( (num_sessions < num_threads) - && (num_threads > config.c_min_workers) ) { + && (num_threads > config.c_min_workers) + && (self != initial_thread) ) { + cdb_free_tsd(); + begin_critical_section(S_WORKER_LIST); --num_threads; + + /* we're exiting before server shutdown... unlink ourself from + the worker list and detach our thread to avoid memory leaks + */ + + for (node = &worker_list; *node != NULL; node = &(*node)->next) + if ((*node)->tid == self) { + tmp = *node; + *node = (*node)->next; + phree(tmp); + break; + } + + pthread_detach(self); + end_critical_section(S_WORKER_LIST); pthread_exit(NULL); } @@ -848,13 +897,12 @@ void init_master_fdset(void) { */ int main(int argc, char **argv) { - pthread_t WorkerThread; /* Thread descriptor */ - pthread_attr_t attr; /* Thread attributes */ char tracefile[128]; /* Name of file to log traces to */ int a, i; /* General-purpose variables */ struct passwd *pw; int drop_root_perms = 1; char *moddir; + struct worker_node *wnp; /* specify default port name and trace file */ strcpy(tracefile, ""); @@ -995,25 +1043,39 @@ int main(int argc, char **argv) /* We want to check for idle sessions once per minute */ CtdlRegisterSessionHook(terminate_idle_sessions, EVT_TIMER); - + /* * Now create a bunch of worker threads. */ + lprintf(9, "Starting %d worker threads\n", config.c_min_workers-1); + begin_critical_section(S_WORKER_LIST); for (i=0; i<(config.c_min_workers-1); ++i) { - lprintf(9, "Creating worker thread %d\n", i); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&WorkerThread, &attr, - (void* (*)(void*)) worker_thread, NULL) != 0) { - lprintf(1, "Can't create worker thead: %s\n", - strerror(errno)); - } + create_worker(); } - + end_critical_section(S_WORKER_LIST); /* Now this thread can become a worker as well. */ - lprintf(9, "Original thread entering worker loop\n"); - worker_thread(); + initial_thread = pthread_self(); + worker_thread(NULL); + + /* Server is exiting. Wait for workers to shutdown. */ + lprintf(7, "Waiting for worker threads to shut down\n"); + + begin_critical_section(S_WORKER_LIST); + while (worker_list != NULL) { + wnp = worker_list; + worker_list = wnp->next; + + /* avoid deadlock with an exiting thread */ + end_critical_section(S_WORKER_LIST); + if ((i = pthread_join(wnp->tid, NULL))) + lprintf(1, "pthread_join: %s\n", strerror(i)); + phree(wnp); + begin_critical_section(S_WORKER_LIST); + } + end_critical_section(S_WORKER_LIST); + + master_cleanup(); return(0); } @@ -1031,7 +1093,7 @@ inline void become_session(struct CitContext *which_con) { /* * This loop just keeps going and going and going... */ -void worker_thread(void) { +void *worker_thread(void *arg) { int i; char junk; int highest; @@ -1046,7 +1108,9 @@ void worker_thread(void) { int ssock; /* Descriptor for client socket */ struct timeval tv; - ++num_threads; + num_threads++; + + cdb_allocate_tsd(); while (!time_to_die) { @@ -1060,6 +1124,11 @@ void worker_thread(void) { * context we want and go on our merry way. */ + /* make doubly sure we're not holding any stale db handles + * which might cause a deadlock. + */ + cdb_release_handles(); + begin_critical_section(S_I_WANNA_SELECT); SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); highest = masterhighest; @@ -1073,17 +1142,26 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); } end_critical_section(S_SESSION_TABLE); - tv.tv_sec = 60; /* wake up every minute if no input */ + tv.tv_sec = 1; /* wake up every second if no input */ tv.tv_usec = 0; - retval = select(highest + 1, &readfds, NULL, NULL, &tv); + + do_select: + if (!time_to_die) + retval = select(highest + 1, &readfds, NULL, NULL, &tv); + else { + end_critical_section(S_I_WANNA_SELECT); + break; + } /* Now figure out who made this select() unblock. * First, check for an error or exit condition. */ if (retval < 0) { - end_critical_section(S_I_WANNA_SELECT); - lprintf(9, "Exiting (%s)\n", strerror(errno)); - time_to_die = 1; + if (errno != EINTR) { + lprintf(9, "Exiting (%s)\n", strerror(errno)); + time_to_die = 1; + } else if (!time_to_die) + goto do_select; } /* Next, check to see if it's a new client connecting @@ -1139,8 +1217,10 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); * thread that the &readfds needs to be refreshed with more * current data. */ - if (time_to_die) + if (time_to_die) { + end_critical_section(S_I_WANNA_SELECT); break; + } if (FD_ISSET(rescan[0], &readfds)) { read(rescan[0], &junk, 1); @@ -1187,6 +1267,7 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); dead_session_purge(); if ((time(NULL) - last_timer) > 60L) { last_timer = time(NULL); + cdb_release_handles(); /* suggested by Justin Case */ PerformSessionHooks(EVT_TIMER); } @@ -1194,8 +1275,9 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); } /* If control reaches this point, the server is shutting down */ - master_cleanup(); --num_threads; - pthread_exit(NULL); + return NULL; } + + diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index 13d5c931f..1398f30b0 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -19,8 +19,9 @@ void *sd_context_loop (struct CitContext *con); void start_daemon (int do_close_stdio); void cmd_nset (char *cmdbuf); int convert_login (char *NameToConvert); -void worker_thread (void); +void *worker_thread (void *arg); inline void become_session(struct CitContext *which_con); void CtdlRedirectOutput(FILE *fp, int sock); extern int num_sessions; +extern int time_to_die; -- 2.30.2