From 12a091b625c28d94e895901c900f4fdbd0776f7f Mon Sep 17 00:00:00 2001 From: Dave West Date: Sat, 12 Dec 2009 00:41:00 +0000 Subject: [PATCH] threads are using signals and the GC code is #ifdef if available. This comit confuses me since I thought it was already in. If there are problems back it out imediately. --- citadel/citserver.c | 2 +- citadel/context.h | 2 + citadel/include/ctdl_module.h | 13 ++ citadel/server_main.c | 7 + citadel/sysdep.c | 245 ++++++++++++++++++++++------------ citadel/sysdep_decls.h | 2 +- citadel/threads.c | 17 +-- 7 files changed, 196 insertions(+), 92 deletions(-) diff --git a/citadel/citserver.c b/citadel/citserver.c index 4a86604ad..43adab030 100644 --- a/citadel/citserver.c +++ b/citadel/citserver.c @@ -243,7 +243,7 @@ void master_cleanup(int exitcode) { if (restart_server != 0) exit(1); - if ((running_as_daemon != 0) && (exitcode == 0)) + if ((running_as_daemon != 0) && ((exitcode == 0) )) exitcode = CTDLEXIT_SHUTDOWN; exit(exitcode); } diff --git a/citadel/context.h b/citadel/context.h index ecf08a0ad..24cfc6861 100644 --- a/citadel/context.h +++ b/citadel/context.h @@ -43,6 +43,7 @@ struct CitContext { unsigned cs_flags; /* miscellaneous flags */ void (*h_command_function) (void) ; /* service command function */ void (*h_async_function) (void) ; /* do async msgs function */ + void (*h_greeting_function) (void) ; /* greeting function for session startup */ int is_async; /* Nonzero if client accepts async msgs */ int async_waiting; /* Nonzero if there are async msgs waiting */ int input_waiting; /* Nonzero if there is client input waiting */ @@ -125,6 +126,7 @@ typedef struct CitContext CitContext; */ enum { CON_IDLE, /* This context is doing nothing */ + CON_STARTING, /* This context needs the greeting outputting */ CON_READY, /* This context needs attention */ CON_EXECUTING /* This context is bound to a thread */ }; diff --git a/citadel/include/ctdl_module.h b/citadel/include/ctdl_module.h index fbb632e69..e4973849a 100644 --- a/citadel/include/ctdl_module.h +++ b/citadel/include/ctdl_module.h @@ -4,6 +4,19 @@ #define CTDL_MODULE_H #include "sysdep.h" + +#ifdef HAVE_GC +#define GC_THREADS +#define GC_REDIRECT_TO_LOCAL +#include +#else +#define GC_MALLOC malloc +#define GC_MALLOC_ATOMIC malloc +#define GC_FREE free +#define GC_REALLOC realloc +#endif + + #include #include #include diff --git a/citadel/server_main.c b/citadel/server_main.c index 66093245c..62a06ccd1 100644 --- a/citadel/server_main.c +++ b/citadel/server_main.c @@ -99,6 +99,13 @@ int main(int argc, char **argv) eCrashParameters params; // eCrashSymbolTable symbol_table; #endif + +#ifdef HAVE_GC + GC_INIT(); + GC_find_leak = 1; +#endif + + /* initialise semaphores here. Patch by Matt and davew * its called here as they are needed by CtdlLogPrintf for thread safety */ diff --git a/citadel/sysdep.c b/citadel/sysdep.c index 0d1b058c6..13c5c9089 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -159,11 +159,10 @@ volatile int restart_server = 0; volatile int running_as_daemon = 0; static RETSIGTYPE signal_cleanup(int signum) { -#ifdef THREADS_USESIGNALS + if (CT) CT->signal = signum; else -#endif { CtdlLogPrintf(CTDL_DEBUG, "Caught signal %d; shutting down.\n", signum); exit_signal = signum; @@ -480,13 +479,25 @@ int client_write(char *buf, int nbytes) FD_ZERO(&wset); FD_SET(Ctx->client_socket, &wset); if (select(1, NULL, &wset, NULL, NULL) == -1) { - CtdlLogPrintf(CTDL_ERR, - "client_write(%d bytes) select failed: %s (%d)\n", - nbytes - bytes_written, - strerror(errno), errno); - cit_backtrace(); - Ctx->kill_me = 1; - return -1; + if (errno == EINTR) + { + CtdlLogPrintf(CTDL_DEBUG, "client_write(%d bytes) select() interrupted.\n", nbytes-bytes_written); + if (CtdlThreadCheckStop()) { + CC->kill_me = 1; + return (-1); + } else { + /* can't trust fd's and stuff so we need to re-create them */ + continue; + } + } else { + CtdlLogPrintf(CTDL_ERR, + "client_write(%d bytes) select failed: %s (%d)\n", + nbytes - bytes_written, + strerror(errno), errno); + cit_backtrace(); + Ctx->kill_me = 1; + return -1; + } } } @@ -559,7 +570,17 @@ int client_read_to(char *buf, int bytes, int timeout) { if (errno == EINTR) { - CtdlLogPrintf(CTDL_DEBUG, "Interrupted select().\n"); + CtdlLogPrintf(CTDL_DEBUG, "Interrupted select() in client_read_to().\n"); + if (CtdlThreadCheckStop()) { + CC->kill_me = 1; + return (-1); + } else { + /* can't trust fd's and stuff so we need to re-create them */ + continue; + } + } + else { + CtdlLogPrintf(CTDL_DEBUG, "Failed select() in client_read_to().\n"); CC->kill_me = 1; return (-1); } @@ -866,18 +887,13 @@ int convert_login(char NameToConvert[]) { */ void *worker_thread(void *arg) { - int i; int highest; CitContext *ptr; CitContext *bind_me = NULL; fd_set readfds; int retval = 0; - CitContext *con= NULL; /* Temporary context pointer */ - struct ServiceFunctionHook *serviceptr; - int ssock; /* Descriptor for client socket */ struct timeval tv; int force_purge = 0; - int m; while (!CtdlThreadCheckStop()) { @@ -904,6 +920,11 @@ do_select: force_purge = 0; if ((bind_me == NULL) && (ptr->state == CON_READY)) { bind_me = ptr; ptr->state = CON_EXECUTING; + break; + } + if ((bind_me == NULL) && (ptr->state == CON_STARTING)) { + bind_me = ptr; + break; } } end_critical_section(S_SESSION_TABLE); @@ -917,20 +938,134 @@ do_select: force_purge = 0; * ahead and get ready to select(). */ + if (!CtdlThreadCheckStop()) { + tv.tv_sec = 1; /* wake up every second if no input */ + tv.tv_usec = 0; + retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv); + } + else + return NULL; + + /* Now figure out who made this select() unblock. + * First, check for an error or exit condition. + */ + if (retval < 0) { + if (errno == EBADF) { + CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n", + strerror(errno)); + goto do_select; + } + if (errno != EINTR) { + CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno)); + CtdlThreadStopAll(); + continue; + } else { + CtdlLogPrintf(CTDL_DEBUG, "Interrupted CtdlThreadSelect.\n"); + if (CtdlThreadCheckStop()) return(NULL); + goto do_select; + } + } + else if(retval == 0) { + if (CtdlThreadCheckStop()) return(NULL); + } + + /* It must be a client socket. Find a context that has data + * waiting on its socket *and* is in the CON_IDLE state. Any + * active sockets other than our chosen one are marked as + * CON_READY so the next thread that comes around can just bind + * to one without having to select() again. + */ + begin_critical_section(S_SESSION_TABLE); + for (ptr = ContextList; ptr != NULL; ptr = ptr->next) { + if ( (FD_ISSET(ptr->client_socket, &readfds)) + && (ptr->state != CON_EXECUTING) ) { + ptr->input_waiting = 1; + if (!bind_me) { + bind_me = ptr; /* I choose you! */ + bind_me->state = CON_EXECUTING; + } + else { + ptr->state = CON_READY; + } + } + } + end_critical_section(S_SESSION_TABLE); + +SKIP_SELECT: + /* We're bound to a session */ + if (bind_me != NULL) { + become_session(bind_me); + + if (bind_me->state == CON_STARTING) { + bind_me->state = CON_EXECUTING; + begin_session(bind_me); + bind_me->h_greeting_function(); + } + /* If the client has sent a command, execute it. */ + if (CC->input_waiting) { + CC->h_command_function(); + CC->input_waiting = 0; + } + + /* If there are asynchronous messages waiting and the + * client supports it, do those now */ + if ((CC->is_async) && (CC->async_waiting) + && (CC->h_async_function != NULL)) { + CC->h_async_function(); + CC->async_waiting = 0; + } + + force_purge = CC->kill_me; + become_session(NULL); + bind_me->state = CON_IDLE; + } + + dead_session_purge(force_purge); + do_housekeeping(); + } + /* If control reaches this point, the server is shutting down */ + return(NULL); +} + + + + +/* + * A function to handle selecting on master sockets. + * In other words it handles new connections. + * It is a thread. + */ +void *select_on_master (void *arg) +{ + struct ServiceFunctionHook *serviceptr; + fd_set master_fds; + int highest; + struct timeval tv; + int ssock; /* Descriptor for client socket */ + CitContext *con= NULL; /* Temporary context pointer */ + int m; + int i; + int retval; + + while (!CtdlThreadCheckStop()) { + /* Initialize the fdset. */ + FD_ZERO(&master_fds); + highest = 0; + /* First, add the various master sockets to the fdset. */ for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) { m = serviceptr->msock; - FD_SET(m, &readfds); + FD_SET(m, &master_fds); if (m > highest) { highest = m; } } if (!CtdlThreadCheckStop()) { - tv.tv_sec = 1; /* wake up every second if no input */ + tv.tv_sec = 60; /* wake up every second if no input */ tv.tv_usec = 0; - retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv); + retval = CtdlThreadSelect(highest + 1, &master_fds, NULL, NULL, &tv); } else return NULL; @@ -942,7 +1077,7 @@ do_select: force_purge = 0; if (errno == EBADF) { CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n", strerror(errno)); - goto do_select; + continue; } if (errno != EINTR) { CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno)); @@ -950,12 +1085,12 @@ do_select: force_purge = 0; } else { CtdlLogPrintf(CTDL_DEBUG, "Interrupted CtdlThreadSelect.\n"); if (CtdlThreadCheckStop()) return(NULL); - goto do_select; + continue; } } else if(retval == 0) { if (CtdlThreadCheckStop()) return(NULL); - goto SKIP_SELECT; + continue; } /* Next, check to see if it's a new client connecting * on a master socket. @@ -963,7 +1098,7 @@ do_select: force_purge = 0; else for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) { - if (FD_ISSET(serviceptr->msock, &readfds)) { + if (FD_ISSET(serviceptr->msock, &master_fds)) { ssock = accept(serviceptr->msock, NULL, 0); if (ssock >= 0) { CtdlLogPrintf(CTDL_DEBUG, @@ -991,6 +1126,7 @@ do_select: force_purge = 0; serviceptr->h_command_function; con->h_async_function = serviceptr->h_async_function; + con->h_greeting_function = serviceptr->h_greeting_function; con->ServiceName = serviceptr->ServiceName; @@ -1004,75 +1140,20 @@ do_select: force_purge = 0; SO_REUSEADDR, &i, sizeof(i)); - become_session(con); - begin_session(con); - serviceptr->h_greeting_function(); - become_session(NULL); - con->state = CON_IDLE; - retval--; - if (retval) - CtdlLogPrintf (CTDL_DEBUG, "Select said more than 1 fd to handle but we only handle one\n"); - goto do_select; - } - } - } + con->state = CON_STARTING; - /* It must be a client socket. Find a context that has data - * waiting on its socket *and* is in the CON_IDLE state. Any - * active sockets other than our chosen one are marked as - * CON_READY so the next thread that comes around can just bind - * to one without having to select() again. - */ - begin_critical_section(S_SESSION_TABLE); - for (ptr = ContextList; ptr != NULL; ptr = ptr->next) { - if ( (FD_ISSET(ptr->client_socket, &readfds)) - && (ptr->state != CON_EXECUTING) ) { - ptr->input_waiting = 1; - if (!bind_me) { - bind_me = ptr; /* I choose you! */ - bind_me->state = CON_EXECUTING; - } - else { - ptr->state = CON_READY; + retval--; + if (retval == 0) + break; } } } - end_critical_section(S_SESSION_TABLE); - -SKIP_SELECT: - /* We're bound to a session */ - if (bind_me != NULL) { - become_session(bind_me); - - /* If the client has sent a command, execute it. */ - if (CC->input_waiting) { - CC->h_command_function(); - CC->input_waiting = 0; - } - - /* If there are asynchronous messages waiting and the - * client supports it, do those now */ - if ((CC->is_async) && (CC->async_waiting) - && (CC->h_async_function != NULL)) { - CC->h_async_function(); - CC->async_waiting = 0; - } - - force_purge = CC->kill_me; - become_session(NULL); - bind_me->state = CON_IDLE; - } - - dead_session_purge(force_purge); - do_housekeeping(); } - /* If control reaches this point, the server is shutting down */ - return(NULL); + return NULL; } - /* * SyslogFacility() * Translate text facility name to syslog.h defined value. diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index 3c463f755..e61cc4096 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -70,7 +70,7 @@ int convert_login (char *NameToConvert); void *worker_thread (void *arg); void init_master_fdset(void); void create_worker(void); - +void *select_on_master (void *arg); extern volatile int exit_signal; extern volatile int shutdown_and_halt; diff --git a/citadel/threads.c b/citadel/threads.c index 2b419848b..a678d60a3 100644 --- a/citadel/threads.c +++ b/citadel/threads.c @@ -345,10 +345,9 @@ void CtdlThreadStopAll(void) GC_thread->state = CTDL_THREAD_STOP_REQ; while(this_thread) { -#ifdef THREADS_USESIGNALS if (!citthread_equal(this_thread->tid, GC_thread->tid)) citthread_kill(this_thread->tid, SIGHUP); -#endif + ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); citthread_cond_signal(&this_thread->ThreadCond); citthread_cond_signal(&this_thread->SleepCond); @@ -510,13 +509,13 @@ int CtdlThreadCheckStop(void) state = CT->state; -#ifdef THREADS_USESIGNALS if (CT->signal) { CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal); + if (CT->signal == SIGHUP) + CT->state = CTDL_THREAD_STOP_REQ; CT->signal = 0; } -#endif if(state == CTDL_THREAD_STOP_REQ) { CT->state = CTDL_THREAD_STOPPING; @@ -546,10 +545,10 @@ void CtdlThreadStop(CtdlThreadNode *thread) return; if (!(this_thread->thread_func)) return; // Don't stop garbage collector -#ifdef THREADS_USESIGNALS + if (!citthread_equal(this_thread->tid, GC_thread->tid)) citthread_kill(this_thread->tid, SIGHUP); -#endif + ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); citthread_cond_signal(&this_thread->ThreadCond); citthread_cond_signal(&this_thread->SleepCond); @@ -663,7 +662,7 @@ void CtdlThreadGC (void) CtdlThreadNode *this_thread, *that_thread; int workers = 0, sys_workers; int ret=0; - + begin_critical_section(S_THREAD_LIST); /* Handle exiting of garbage collector thread */ @@ -1267,8 +1266,10 @@ void go_threading(void) ctdl_thread_internal_init(); /* Second call to module init functions now that threading is up */ - if (!statcount) + if (!statcount) { initialise_modules(1); + CtdlThreadCreate("select_on_master", CTDLTHREAD_BIGSTACK, select_on_master, NULL); + } else { CtdlLogPrintf(CTDL_EMERG, "Running connection simulation stats\n"); gettimeofday(&start, NULL); -- 2.30.2