From ff9480f7250d97d132a4f4e8623a11db48e07e4c Mon Sep 17 00:00:00 2001 From: Dave West Date: Sun, 13 Jan 2008 16:40:49 +0000 Subject: [PATCH] do_hosekeeping is now a seperate thread. This releases worker threads from worrying about mundane tasks. Some other code regarding experimental select() in threads. Its turned off at the moment. --- citadel/housekeeping.c | 20 +- citadel/housekeeping.h | 2 +- citadel/server.h | 15 + citadel/sysdep.c | 5 +- citadel/sysdep_decls.h | 2 + citadel/threads.c | 1210 +++++++++++++++++++++++----------------- 6 files changed, 720 insertions(+), 534 deletions(-) diff --git a/citadel/housekeeping.c b/citadel/housekeeping.c index 742fb861b..bde6634b9 100644 --- a/citadel/housekeeping.c +++ b/citadel/housekeeping.c @@ -133,7 +133,7 @@ void check_ref_counts(void) { * only allow housekeeping to execute once per minute, and we only allow one * instance to run at a time. */ -void do_housekeeping(void) { +void *do_housekeeping(void *args) { static int housekeeping_in_progress = 0; static time_t last_timer = 0L; int do_housekeeping_now = 0; @@ -146,29 +146,36 @@ void do_housekeeping(void) { * S_HOUSEKEEPING critical section because it eliminates the need to * potentially have multiple concurrent mutexes in progress. */ - begin_critical_section(S_HOUSEKEEPING); + while (!CtdlThreadCheckStop()) + { + CtdlThreadName("House keeping - sleeping"); + CtdlThreadSleep(1); + +/* begin_critical_section(S_HOUSEKEEPING); if (housekeeping_in_progress == 0) { do_housekeeping_now = 1; housekeeping_in_progress = 1; +*/ now = time(NULL); if ( (now - last_timer) > (time_t)60 ) { do_perminute_housekeeping_now = 1; last_timer = time(NULL); } +/* } end_critical_section(S_HOUSEKEEPING); if (do_housekeeping_now == 0) { return; } - +*/ /* * Ok, at this point we've made the decision to run the housekeeping * loop. Everything below this point is real work. */ /* First, do the "as often as needed" stuff... */ - old_name = CtdlThreadName("House Keeping - Journal"); + CtdlThreadName("House Keeping - Journal"); JournalRunQueue(); CtdlThreadName("House Keeping - EVT_HOUSE"); @@ -176,14 +183,15 @@ void do_housekeeping(void) { /* Then, do the "once per minute" stuff... */ if (do_perminute_housekeeping_now) { + do_perminute_housekeeping_now = 0; cdb_check_handles(); /* suggested by Justin Case */ CtdlThreadName("House Keeping - EVT_TIMER"); PerformSessionHooks(EVT_TIMER); /* Run any timer hooks */ } - + } /* * All done. */ housekeeping_in_progress = 0; - CtdlThreadName(old_name); + return NULL; } diff --git a/citadel/housekeeping.h b/citadel/housekeeping.h index e7048dc5b..448c6fdca 100644 --- a/citadel/housekeeping.h +++ b/citadel/housekeeping.h @@ -2,4 +2,4 @@ void terminate_idle_sessions(void); void check_sched_shutdown(void); void check_ref_counts(void); -void do_housekeeping(void); +void *do_housekeeping(void *args); diff --git a/citadel/server.h b/citadel/server.h index 01e513554..9c5fabd79 100644 --- a/citadel/server.h +++ b/citadel/server.h @@ -16,6 +16,17 @@ #include #endif +#if TIME_WITH_SYS_TIME +# include +# include +#else +# if HAVE_SYS_TIME_H +# include +# else +# include +# endif +#endif + /* * New format for a message in memory */ @@ -78,6 +89,7 @@ struct CitContext { unsigned cs_flags; /* miscellaneous flags */ void (*h_command_function) (void) ; /* service command function */ void (*h_async_function) (void) ; /* do async msgs function */ + void (*h_greeting_function) (void) ; /* service startup function */ int is_async; /* Nonzero if client accepts async msgs */ int async_waiting; /* Nonzero if there are async msgs waiting */ int input_waiting; /* Nonzero if there is client input waiting */ @@ -143,6 +155,8 @@ struct CitContext { struct cit_ical *CIT_ICAL; /* calendaring data */ struct ma_info *ma; /* multipart/alternative data */ const char* ServiceName; /**< whats our actual purpose? */ + + struct timeval client_expires_at; /** When this client will expire */ }; typedef struct CitContext t_context; @@ -159,6 +173,7 @@ typedef struct CitContext t_context; */ enum { CON_IDLE, /* This context is doing nothing */ + CON_START, /* This context is starting up */ CON_READY, /* This context needs attention */ CON_EXECUTING /* This context is bound to a thread */ }; diff --git a/citadel/sysdep.c b/citadel/sysdep.c index b89680c9e..7bc8741c7 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -234,6 +234,9 @@ void init_sysdep(void) { // sigaddset(&set, SIGILL); we want core dumps // sigaddset(&set, SIGBUS); sigprocmask(SIG_UNBLOCK, &set, NULL); + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + sigprocmask(SIG_BLOCK, &set, NULL); signal(SIGINT, signal_cleanup); signal(SIGQUIT, signal_cleanup); @@ -454,6 +457,7 @@ struct CitContext *CreateNewContext(void) { if (me->next != NULL) { me->next->prev = me; } + me->client_expires_at.tv_sec = config.c_sleeping; ++num_sessions; end_critical_section(S_SESSION_TABLE); return (me); @@ -1256,7 +1260,6 @@ SKIP_SELECT: } dead_session_purge(force_purge); - do_housekeeping(); } /* If control reaches this point, the server is shutting down */ return(NULL); diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index 7d02ba74b..5737ccc5a 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -145,8 +145,10 @@ typedef pthread_attr_t citthread_attr_t; #define citthread_kill pthread_kill #define citthread_cond_signal pthread_cond_signal +#define citthread_cond_broadcast pthread_cond_broadcast #define citthread_cancel pthread_cancel #define citthread_cond_timedwait pthread_cond_timedwait +#define citthread_cond_wait pthread_cond_wait #define citthread_equal pthread_equal #define citthread_self pthread_self #define citthread_create pthread_create diff --git a/citadel/threads.c b/citadel/threads.c index 7d689ca6e..6e95ed3b3 100644 --- a/citadel/threads.c +++ b/citadel/threads.c @@ -12,6 +12,7 @@ #include #include #include +#include #if TIME_WITH_SYS_TIME # include @@ -50,8 +51,8 @@ * remove the need for the calls to eCrashRegisterThread and friends */ -static int num_threads = 0; /* Current number of threads */ -static int num_workers = 0; /* Current number of worker threads */ +static int num_threads = 0; /* Current number of threads */ +static int num_workers = 0; /* Current number of worker threads */ CtdlThreadNode *CtdlThreadList = NULL; CtdlThreadNode *CtdlThreadSchedList = NULL; @@ -64,6 +65,9 @@ citthread_key_t ThreadKey; citthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */ +int idle_workers = 0; +citthread_cond_t worker_block; +citthread_mutex_t worker_block_mutex; void InitialiseSemaphores(void) @@ -71,7 +75,7 @@ void InitialiseSemaphores(void) int i; /* Set up a bunch of semaphores to be used for critical sections */ - for (i=0; icursors, 0, sizeof tsd->cursors); tsd->self = NULL; - + citthread_setspecific(ThreadKey, tsd); } @@ -193,16 +201,14 @@ void ctdl_thread_internal_cleanup(void) { int i; CtdlThreadNode *this_thread, *that_thread; - - for (i=0; inext; citthread_mutex_destroy(&that_thread->ThreadMutex); @@ -219,64 +225,70 @@ void ctdl_thread_internal_init(void) { CtdlThreadNode *this_thread; int ret = 0; - + GC_thread = citthread_self(); - CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread"); + CtdlThreadStates[CTDL_THREAD_INVALID] = strdup("Invalid Thread"); CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread"); - CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created"); - CtdlThreadStates[CTDL_THREAD_CANCELLED] = strdup("Thread Cancelled"); + CtdlThreadStates[CTDL_THREAD_CREATE] = + strdup("Thread being Created"); + CtdlThreadStates[CTDL_THREAD_CANCELLED] = + strdup("Thread Cancelled"); CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited"); CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping"); - CtdlThreadStates[CTDL_THREAD_STOP_REQ] = strdup("Thread Stop Requested"); + CtdlThreadStates[CTDL_THREAD_STOP_REQ] = + strdup("Thread Stop Requested"); CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping"); CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running"); CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked"); - + /* Get ourself a thread entry */ this_thread = malloc(sizeof(CtdlThreadNode)); if (this_thread == NULL) { - CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, can't allocate CtdlThreadNode, exiting\n"); return; } // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. - memset (this_thread, 0, sizeof(CtdlThreadNode)); - - citthread_mutex_init (&(this_thread->ThreadMutex), NULL); - citthread_cond_init (&(this_thread->ThreadCond), NULL); - citthread_mutex_init (&(this_thread->SleepMutex), NULL); - citthread_cond_init (&(this_thread->SleepCond), NULL); - + memset(this_thread, 0, sizeof(CtdlThreadNode)); + + citthread_mutex_init(&(this_thread->ThreadMutex), NULL); + citthread_cond_init(&(this_thread->ThreadCond), NULL); + citthread_mutex_init(&(this_thread->SleepMutex), NULL); + citthread_cond_init(&(this_thread->SleepCond), NULL); + /* We are garbage collector so create us as running */ this_thread->state = CTDL_THREAD_RUNNING; - + if ((ret = citthread_attr_init(&this_thread->attr))) { - CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret)); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, citthread_attr_init: %s\n", + strerror(ret)); free(this_thread); return; } this_thread->name = "Garbage Collection Thread"; - + this_thread->tid = GC_thread; CT = this_thread; - - num_threads++; // Increase the count of threads in the system. + + num_threads++; // Increase the count of threads in the system. this_thread->next = CtdlThreadList; CtdlThreadList = this_thread; if (this_thread->next) this_thread->next->prev = this_thread; /* Set up start times */ - gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ - memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ + gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ + memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval)); /* Changed state so mark it. */ } /* * A function to update a threads load averages */ - void ctdl_thread_internal_update_avgs(CtdlThreadNode *this_thread) - { +void ctdl_thread_internal_update_avgs(CtdlThreadNode * this_thread) +{ struct timeval now, result; double last_duration; @@ -285,33 +297,42 @@ void ctdl_thread_internal_init(void) /* I don't think these mutex's are needed here */ citthread_mutex_lock(&this_thread->ThreadMutex); // result now has a timeval for the time we spent in the last state since we last updated - last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000); + last_duration = + (double) result.tv_sec + + ((double) result.tv_usec / (double) 1000000); if (this_thread->state == CTDL_THREAD_SLEEPING) this_thread->avg_sleeping += last_duration; if (this_thread->state == CTDL_THREAD_RUNNING) this_thread->avg_running += last_duration; if (this_thread->state == CTDL_THREAD_BLOCKED) this_thread->avg_blocked += last_duration; - memcpy (&this_thread->last_state_change, &now, sizeof (struct timeval)); + memcpy(&this_thread->last_state_change, &now, + sizeof(struct timeval)); citthread_mutex_unlock(&this_thread->ThreadMutex); } /* * A function to chenge the state of a thread */ -void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state) +void ctdl_thread_internal_change_state(CtdlThreadNode * this_thread, + enum CtdlThreadState new_state) { /* * Wether we change state or not we need update the load values */ ctdl_thread_internal_update_avgs(this_thread); /* This mutex not needed here? */ - citthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */ - if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ)) + citthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */ + if ((new_state == CTDL_THREAD_STOP_REQ) + && (this_thread->state > CTDL_THREAD_STOP_REQ)) this_thread->state = new_state; - if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING)) + if (((new_state == CTDL_THREAD_SLEEPING) + || (new_state == CTDL_THREAD_BLOCKED)) + && (this_thread->state == CTDL_THREAD_RUNNING)) this_thread->state = new_state; - if ((new_state == CTDL_THREAD_RUNNING) && ((this_thread->state == CTDL_THREAD_SLEEPING) || (this_thread->state == CTDL_THREAD_BLOCKED))) + if ((new_state == CTDL_THREAD_RUNNING) + && ((this_thread->state == CTDL_THREAD_SLEEPING) + || (this_thread->state == CTDL_THREAD_BLOCKED))) this_thread->state = new_state; citthread_mutex_unlock(&this_thread->ThreadMutex); } @@ -324,23 +345,27 @@ void CtdlThreadStopAll(void) { //FIXME: The signalling of the condition should not be in the critical_section // We need to build a list of threads we are going to signal and then signal them afterwards - + CtdlThreadNode *this_thread; - + begin_critical_section(S_THREAD_LIST); this_thread = CtdlThreadList; - while(this_thread) - { + while (this_thread) { #ifdef THREADS_USESIGNALS citthread_killl(this_thread->tid, SIGHUP); #endif - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); + citthread_kill(this_thread->tid, SIGUSR1); + ctdl_thread_internal_change_state(this_thread, + CTDL_THREAD_STOP_REQ); citthread_cond_signal(&this_thread->ThreadCond); citthread_cond_signal(&this_thread->SleepCond); - CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (%ld).\n", this_thread->name, this_thread->tid); + CtdlLogPrintf(CTDL_DEBUG, + "Thread system stopping thread \"%s\" (%ld).\n", + this_thread->name, this_thread->tid); this_thread = this_thread->next; } end_critical_section(S_THREAD_LIST); + citthread_cond_broadcast(&worker_block); } @@ -350,15 +375,13 @@ void CtdlThreadStopAll(void) void CtdlThreadWakeAll(void) { CtdlThreadNode *this_thread; - + CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n"); - + begin_critical_section(S_THREAD_LIST); this_thread = CtdlThreadList; - while(this_thread) - { - if (!this_thread->thread_func) - { + while (this_thread) { + if (!this_thread->thread_func) { citthread_cond_signal(&this_thread->ThreadCond); citthread_cond_signal(&this_thread->SleepCond); } @@ -373,20 +396,20 @@ void CtdlThreadWakeAll(void) */ int CtdlThreadGetCount(void) { - return num_threads; + return num_threads; } int CtdlThreadGetWorkers(void) { - return num_workers; + return num_workers; } double CtdlThreadGetWorkerAvg(void) { double ret; - + begin_critical_section(S_THREAD_LIST); - ret = CtdlThreadWorkerAvg; + ret = CtdlThreadWorkerAvg; end_critical_section(S_THREAD_LIST); return ret; } @@ -394,9 +417,9 @@ double CtdlThreadGetWorkerAvg(void) double CtdlThreadGetLoadAvg(void) { double ret; - + begin_critical_section(S_THREAD_LIST); - ret = CtdlThreadLoadAvg; + ret = CtdlThreadLoadAvg; end_critical_section(S_THREAD_LIST); return ret; } @@ -411,45 +434,47 @@ double CtdlThreadGetLoadAvg(void) const char *CtdlThreadName(const char *name) { const char *old_name; - - if (!CT) - { - CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name); + + if (!CT) { + CtdlLogPrintf(CTDL_WARNING, + "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", + name); return NULL; } old_name = CT->name; if (name) CT->name = name; return (old_name); -} +} /* * A function to force a thread to exit */ -void CtdlThreadCancel(CtdlThreadNode *thread) +void CtdlThreadCancel(CtdlThreadNode * thread) { CtdlThreadNode *this_thread; - + if (!thread) this_thread = CT; else this_thread = thread; - if (!this_thread) - { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n"); + if (!this_thread) { + CtdlLogPrintf(CTDL_EMERG, + "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n"); CtdlThreadStopAll(); return; } - - if (!this_thread->thread_func) - { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n"); + + if (!this_thread->thread_func) { + CtdlLogPrintf(CTDL_EMERG, + "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n"); CtdlThreadStopAll(); return; } - - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED); + + ctdl_thread_internal_change_state(this_thread, + CTDL_THREAD_CANCELLED); citthread_cancel(this_thread->tid); } @@ -460,27 +485,27 @@ void CtdlThreadCancel(CtdlThreadNode *thread) int CtdlThreadCheckStop(void) { int state; - - if (!CT) - { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n"); + + if (!CT) { + CtdlLogPrintf(CTDL_EMERG, + "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n"); CtdlThreadStopAll(); return -1; } - + state = CT->state; #ifdef THREADS_USERSIGNALS if (CT->signal) - CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal); + CtdlLogPrintf(CTDL_DEBUG, + "Thread \"%s\" caught signal %d.\n", + CT->name, CT->signal); #endif - if(state == CTDL_THREAD_STOP_REQ) - { + if (state == CTDL_THREAD_STOP_REQ) { CT->state = CTDL_THREAD_STOPPING; return -1; - } - else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE)) - { + } else if ((state < CTDL_THREAD_STOP_REQ) + && (state > CTDL_THREAD_CREATE)) { return -1; } return 0; @@ -491,10 +516,10 @@ int CtdlThreadCheckStop(void) * A function to ask a thread to exit * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit */ -void CtdlThreadStop(CtdlThreadNode *thread) +void CtdlThreadStop(CtdlThreadNode * thread) { CtdlThreadNode *this_thread; - + if (!thread) this_thread = CT; else @@ -502,11 +527,12 @@ void CtdlThreadStop(CtdlThreadNode *thread) if (!this_thread) return; if (!(this_thread->thread_func)) - return; // Don't stop garbage collector + return; // Don't stop garbage collector #ifdef THREADS_USESIGNALS - citthread_kill(this_thread->tid, SIGHUP); + citthread_kill(this_thread->tid, SIGHUP); #endif - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); + ctdl_thread_internal_change_state(this_thread, + CTDL_THREAD_STOP_REQ); citthread_cond_signal(&this_thread->ThreadCond); citthread_cond_signal(&this_thread->SleepCond); } @@ -518,26 +544,27 @@ void CtdlThreadSleep(int secs) { struct timespec wake_time; struct timeval time_now; - - - if (!CT) - { - CtdlLogPrintf(CTDL_WARNING, "CtdlThreadSleep() called by something that is not a thread. Should we die?\n"); + + + if (!CT) { + CtdlLogPrintf(CTDL_WARNING, + "CtdlThreadSleep() called by something that is not a thread. Should we die?\n"); return; } - - memset (&wake_time, 0, sizeof(struct timespec)); + + memset(&wake_time, 0, sizeof(struct timespec)); gettimeofday(&time_now, NULL); wake_time.tv_sec = time_now.tv_sec + secs; wake_time.tv_nsec = time_now.tv_usec * 10; - ctdl_thread_internal_change_state (CT, CTDL_THREAD_SLEEPING); - - citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */ - citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, &wake_time); + ctdl_thread_internal_change_state(CT, CTDL_THREAD_SLEEPING); + + citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */ + citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, + &wake_time); citthread_mutex_unlock(&CT->ThreadMutex); - - ctdl_thread_internal_change_state (CT, CTDL_THREAD_RUNNING); + + ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING); } @@ -550,12 +577,13 @@ static void ctdl_internal_thread_cleanup(void *arg) * In here we were called by the current thread because it is exiting * NB. WE ARE THE CURRENT THREAD */ - CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", CT->name, CT->tid); - - #ifdef HAVE_BACKTRACE + CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", + CT->name, CT->tid); + +#ifdef HAVE_BACKTRACE eCrash_UnregisterThread(); - #endif - +#endif + citthread_mutex_lock(&CT->ThreadMutex); CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early citthread_mutex_unlock(&CT->ThreadMutex); @@ -573,38 +601,43 @@ void ctdl_thread_internal_calc_loadavg(void) that_thread = CtdlThreadList; load_avg = 0; worker_avg = 0; - while(that_thread) - { + while (that_thread) { /* Update load averages */ ctdl_thread_internal_update_avgs(that_thread); citthread_mutex_lock(&that_thread->ThreadMutex); - that_thread->load_avg = (that_thread->avg_sleeping + that_thread->avg_running) / (that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked) * 100; + that_thread->load_avg = + (that_thread->avg_sleeping + + that_thread->avg_running) / + (that_thread->avg_sleeping + that_thread->avg_running + + that_thread->avg_blocked) * 100; that_thread->avg_sleeping /= 2; that_thread->avg_running /= 2; that_thread->avg_blocked /= 2; load_avg += that_thread->load_avg; - if (that_thread->flags & CTDLTHREAD_WORKER) - { + if (that_thread->flags & CTDLTHREAD_WORKER) { worker_avg += that_thread->load_avg; workers++; } #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n", - that_thread->name, - that_thread->tid, - CtdlThreadStates[that_thread->state], - that_thread->avg_sleeping, - that_thread->avg_running, - that_thread->avg_blocked, - that_thread->load_avg); + CtdlLogPrintf(CTDL_DEBUG, + "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n", + that_thread->name, that_thread->tid, + CtdlThreadStates[that_thread->state], + that_thread->avg_sleeping, + that_thread->avg_running, + that_thread->avg_blocked, + that_thread->load_avg); #endif citthread_mutex_unlock(&that_thread->ThreadMutex); that_thread = that_thread->next; } - CtdlThreadLoadAvg = load_avg/num_threads; - CtdlThreadWorkerAvg = worker_avg/workers; + CtdlThreadLoadAvg = load_avg / num_threads; + CtdlThreadWorkerAvg = worker_avg / workers; #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_INFO, "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n", CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions); + CtdlLogPrintf(CTDL_INFO, + "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n", + CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, + num_workers, num_sessions); #endif } @@ -613,65 +646,64 @@ void ctdl_thread_internal_calc_loadavg(void) * Garbage collection routine. * Gets called by main() in a loop to clean up the thread list periodically. */ -void CtdlThreadGC (void) +void CtdlThreadGC(void) { CtdlThreadNode *this_thread, *that_thread; int workers = 0, sys_workers; - int ret=0; - + int ret = 0; + begin_critical_section(S_THREAD_LIST); - + /* Handle exiting of garbage collector thread */ - if(num_threads == 1) + if (num_threads == 1) CtdlThreadList->state = CTDL_THREAD_EXITED; - + #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_DEBUG, "Thread system running garbage collection.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Thread system running garbage collection.\n"); #endif /* * Woke up to do garbage collection */ this_thread = CtdlThreadList; - while(this_thread) - { + while (this_thread) { that_thread = this_thread; this_thread = this_thread->next; - + /* Do we need to clean up this thread? */ - if (that_thread->state != CTDL_THREAD_EXITED) - { - if(that_thread->flags & CTDLTHREAD_WORKER) + if (that_thread->state != CTDL_THREAD_EXITED) { + if (that_thread->flags & CTDLTHREAD_WORKER) workers++; /* Sanity check on number of worker threads */ continue; } - - if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) - { /* Sanity check */ + + if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) { /* Sanity check */ end_critical_section(S_THREAD_LIST); - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system PANIC, a thread is trying to clean up after itself.\n"); abort(); return; } - - if (num_threads <= 0) - { /* Sanity check */ + + if (num_threads <= 0) { /* Sanity check */ end_critical_section(S_THREAD_LIST); - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n"); abort(); return; } - if(that_thread->flags & CTDLTHREAD_WORKER) + if (that_thread->flags & CTDLTHREAD_WORKER) num_workers--; /* This is a wroker thread so reduce the count. */ num_threads--; /* If we are unlinking the list head then the next becomes the list head */ - if(that_thread->prev) + if (that_thread->prev) that_thread->prev->next = that_thread->next; else CtdlThreadList = that_thread->next; - if(that_thread->next) + if (that_thread->next) that_thread->next->prev = that_thread->prev; - + citthread_cond_signal(&that_thread->ThreadCond); citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake citthread_mutex_lock(&that_thread->ThreadMutex); // Make sure it has done what its doing @@ -681,19 +713,25 @@ void CtdlThreadGC (void) * Also makes sure the thread has cleaned up after itself before we remove it from the list * We can join on the garbage collector thread the join should just return EDEADLCK */ - ret = citthread_join (that_thread->tid, NULL); + ret = citthread_join(that_thread->tid, NULL); if (ret == EDEADLK) - CtdlLogPrintf(CTDL_DEBUG, "Garbage collection on own thread.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Garbage collection on own thread.\n"); else if (ret == EINVAL) - CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, that thread already joined on.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Garbage collection, that thread already joined on.\n"); else if (ret == ESRCH) - CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, no thread to join on.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Garbage collection, no thread to join on.\n"); else if (ret != 0) - CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, citthread_join returned an unknown error.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Garbage collection, citthread_join returned an unknown error.\n"); /* * Now we own that thread entry */ - CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (%ld).\n", that_thread->name, that_thread->tid); + CtdlLogPrintf(CTDL_INFO, + "Garbage Collection for thread \"%s\" (%ld).\n", + that_thread->name, that_thread->tid); citthread_mutex_destroy(&that_thread->ThreadMutex); citthread_cond_destroy(&that_thread->ThreadCond); citthread_mutex_destroy(&that_thread->SleepMutex); @@ -703,27 +741,25 @@ void CtdlThreadGC (void) } sys_workers = num_workers; end_critical_section(S_THREAD_LIST); - + /* Sanity check number of worker threads */ - if (workers != sys_workers) - { + if (workers != sys_workers) { CtdlLogPrintf(CTDL_EMERG, - "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n", - workers, sys_workers - ); + "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n", + workers, sys_workers); abort(); } } - + /* * Runtime function for a Citadel Thread. * This initialises the threads environment and then calls the user supplied thread function * Note that this is the REAL thread function and wraps the users thread function. - */ -static void *ctdl_internal_thread_func (void *arg) + */ +static void *ctdl_internal_thread_func(void *arg) { CtdlThreadNode *this_thread; void *ret = NULL; @@ -734,53 +770,53 @@ static void *ctdl_internal_thread_func (void *arg) */ begin_critical_section(S_THREAD_LIST); this_thread = (CtdlThreadNode *) arg; - gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ -// citthread_mutex_lock(&this_thread->ThreadMutex); - + gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ +// citthread_mutex_lock(&this_thread->ThreadMutex); + // Register the cleanup function to take care of when we exit. citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL); // Get our thread data structure CtdlThreadAllocTSD(); CT = this_thread; this_thread->pid = getpid(); - memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ + memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval)); /* Changed state so mark it. */ /* Only change to running state if we weren't asked to stop during the create cycle * Other wise there is a window to allow this threads creation to continue to full grown and * therby prevent a shutdown of the server. */ -// citthread_mutex_unlock(&this_thread->ThreadMutex); - - if (!CtdlThreadCheckStop()) - { +// citthread_mutex_unlock(&this_thread->ThreadMutex); + + if (!CtdlThreadCheckStop()) { citthread_mutex_lock(&this_thread->ThreadMutex); this_thread->state = CTDL_THREAD_RUNNING; citthread_mutex_unlock(&this_thread->ThreadMutex); } end_critical_section(S_THREAD_LIST); - + // Register for tracing - #ifdef HAVE_BACKTRACE +#ifdef HAVE_BACKTRACE eCrash_RegisterThread(this_thread->name, 0); - #endif - +#endif + // Tell the world we are here - CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", this_thread->name, this_thread->tid); + CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", + this_thread->name, this_thread->tid); + + - - /* * run the thread to do the work but only if we haven't been asked to stop */ if (!CtdlThreadCheckStop()) - ret = (this_thread->thread_func)(this_thread->user_args); - + ret = (this_thread->thread_func) (this_thread->user_args); + /* * Our thread is exiting either because it wanted to end or because the server is stopping * We need to clean up */ citthread_cleanup_pop(1); // Execute our cleanup routine and remove it - - return(ret); + + return (ret); } @@ -789,28 +825,31 @@ static void *ctdl_internal_thread_func (void *arg) /* * Function to initialise an empty thread structure */ -CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *this_thread, long flags) +CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode * + this_thread, long flags) { int ret = 0; - + // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. - memset (this_thread, 0, sizeof(CtdlThreadNode)); - + memset(this_thread, 0, sizeof(CtdlThreadNode)); + /* Create the mutex's early so we can use them */ - citthread_mutex_init (&(this_thread->ThreadMutex), NULL); - citthread_cond_init (&(this_thread->ThreadCond), NULL); - citthread_mutex_init (&(this_thread->SleepMutex), NULL); - citthread_cond_init (&(this_thread->SleepCond), NULL); - + citthread_mutex_init(&(this_thread->ThreadMutex), NULL); + citthread_cond_init(&(this_thread->ThreadCond), NULL); + citthread_mutex_init(&(this_thread->SleepMutex), NULL); + citthread_cond_init(&(this_thread->SleepCond), NULL); + this_thread->state = CTDL_THREAD_CREATE; - + if ((ret = citthread_attr_init(&this_thread->attr))) { citthread_mutex_unlock(&this_thread->ThreadMutex); citthread_mutex_destroy(&(this_thread->ThreadMutex)); citthread_cond_destroy(&(this_thread->ThreadCond)); citthread_mutex_destroy(&(this_thread->SleepMutex)); citthread_cond_destroy(&(this_thread->SleepCond)); - CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret)); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, citthread_attr_init: %s\n", + strerror(ret)); free(this_thread); return NULL; } @@ -819,20 +858,26 @@ CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *this_thread, lo * otherwise the MIME parser crashes on FreeBSD, and the IMAP service * crashes on 64-bit Linux. */ - if (flags & CTDLTHREAD_BIGSTACK) - { + if (flags & CTDLTHREAD_BIGSTACK) { #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n"); + CtdlLogPrintf(CTDL_INFO, + "Thread system. Creating BIG STACK thread.\n"); #endif - if ((ret = citthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) { + if ((ret = + citthread_attr_setstacksize(&this_thread->attr, + THREADSTACKSIZE))) { citthread_mutex_unlock(&this_thread->ThreadMutex); - citthread_mutex_destroy(&(this_thread->ThreadMutex)); + citthread_mutex_destroy(& + (this_thread-> + ThreadMutex)); citthread_cond_destroy(&(this_thread->ThreadCond)); - citthread_mutex_destroy(&(this_thread->SleepMutex)); + citthread_mutex_destroy(& + (this_thread->SleepMutex)); citthread_cond_destroy(&(this_thread->SleepCond)); citthread_attr_destroy(&this_thread->attr); - CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_setstacksize: %s\n", - strerror(ret)); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, citthread_attr_setstacksize: %s\n", + strerror(ret)); free(this_thread); return NULL; } @@ -843,38 +888,42 @@ CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *this_thread, lo * because the creation didn't affect the load average. */ this_thread->avg_blocked = 2; - + return (this_thread); } - + /* * Internal function to create a thread. - */ -CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args) + */ +CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, + void *(*thread_func) (void + *arg), + void *args) { int ret = 0; CtdlThreadNode *this_thread; - if (num_threads >= 32767) - { - CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n"); + if (num_threads >= 32767) { + CtdlLogPrintf(CTDL_EMERG, + "Thread system. Thread list full.\n"); return NULL; } - + this_thread = malloc(sizeof(CtdlThreadNode)); if (this_thread == NULL) { - CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, can't allocate CtdlThreadNode, exiting\n"); return NULL; } - + /* Initialise the thread structure */ - if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) - { + if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) { free(this_thread); - CtdlLogPrintf(CTDL_EMERG, "Thread system, can't initialise CtdlThreadNode, exiting\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, can't initialise CtdlThreadNode, exiting\n"); return NULL; } /* @@ -882,32 +931,32 @@ CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thre * first because most implimentations of threading can't create it in a stopped state * and it might want to do things with its structure that aren't initialised otherwise. */ - if(name) - { + if (name) { this_thread->name = name; - } - else - { + } else { this_thread->name = "Un-named Thread"; } - + this_thread->flags = flags; this_thread->thread_func = thread_func; this_thread->user_args = args; - -// citthread_mutex_lock(&this_thread->ThreadMutex); - + +// citthread_mutex_lock(&this_thread->ThreadMutex); + begin_critical_section(S_THREAD_LIST); /* * We pass this_thread into the thread as its args so that it can find out information * about itself and it has a bit of storage space for itself, not to mention that the REAL * thread function needs to finish off the setup of the structure */ - if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0)) - { + if ((ret = + citthread_create(&this_thread->tid, &this_thread->attr, + ctdl_internal_thread_func, + this_thread) != 0)) { end_critical_section(S_THREAD_LIST); - CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n", - strerror(ret)); + CtdlLogPrintf(CTDL_ALERT, + "Thread system, Can't create thread: %s\n", + strerror(ret)); citthread_mutex_unlock(&this_thread->ThreadMutex); citthread_mutex_destroy(&(this_thread->ThreadMutex)); citthread_cond_destroy(&(this_thread->ThreadCond)); @@ -917,9 +966,9 @@ CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thre free(this_thread); return NULL; } - - num_threads++; // Increase the count of threads in the system. - if(this_thread->flags & CTDLTHREAD_WORKER) + + num_threads++; // Increase the count of threads in the system. + if (this_thread->flags & CTDLTHREAD_WORKER) num_workers++; this_thread->next = CtdlThreadList; @@ -927,10 +976,10 @@ CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thre if (this_thread->next) this_thread->next->prev = this_thread; ctdl_thread_internal_calc_loadavg(); - -// citthread_mutex_unlock(&this_thread->ThreadMutex); + +// citthread_mutex_unlock(&this_thread->ThreadMutex); end_critical_section(S_THREAD_LIST); - + return this_thread; } @@ -940,10 +989,12 @@ CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thre * char *name = name to give to thread, if NULL, use generic name * int flags = flags to determine type of thread and standard facilities */ -CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args) +CtdlThreadNode *CtdlThreadCreate(char *name, long flags, + void *(*thread_func) (void *arg), + void *args) { CtdlThreadNode *ret = NULL; - + ret = ctdl_internal_create_thread(name, flags, thread_func, args); return ret; } @@ -953,27 +1004,30 @@ CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (v /* * Internal function to schedule a thread. * Must be called from within a S_THREAD_LIST critical section - */ -CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when) + */ +CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, + void *(*thread_func) (void *arg), + void *args, time_t when) { CtdlThreadNode *this_thread; - if (num_threads >= 32767) - { - CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n"); + if (num_threads >= 32767) { + CtdlLogPrintf(CTDL_EMERG, + "Thread system. Thread list full.\n"); return NULL; } - + this_thread = malloc(sizeof(CtdlThreadNode)); if (this_thread == NULL) { - CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, can't allocate CtdlThreadNode, exiting\n"); return NULL; } /* Initialise the thread structure */ - if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) - { + if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) { free(this_thread); - CtdlLogPrintf(CTDL_EMERG, "Thread system, can't initialise CtdlThreadNode, exiting\n"); + CtdlLogPrintf(CTDL_EMERG, + "Thread system, can't initialise CtdlThreadNode, exiting\n"); return NULL; } @@ -982,19 +1036,16 @@ CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) * first because most implimentations of threading can't create it in a stopped state * and it might want to do things with its structure that aren't initialised otherwise. */ - if(name) - { + if (name) { this_thread->name = name; - } - else - { + } else { this_thread->name = "Un-named Thread"; } - + this_thread->flags = flags; this_thread->thread_func = thread_func; this_thread->user_args = args; - + /* * When to start this thread */ @@ -1006,28 +1057,33 @@ CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) if (this_thread->next) this_thread->next->prev = this_thread; end_critical_section(S_SCHEDULE_LIST); - + return this_thread; } -CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_thread) +CtdlThreadNode *ctdl_thread_internal_start_scheduled(CtdlThreadNode * + this_thread) { int ret = 0; - -// citthread_mutex_lock(&that_thread->ThreadMutex); + +// citthread_mutex_lock(&that_thread->ThreadMutex); begin_critical_section(S_THREAD_LIST); /* * We pass this_thread into the thread as its args so that it can find out information * about itself and it has a bit of storage space for itself, not to mention that the REAL * thread function needs to finish off the setup of the structure */ - if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0)) - { + if ((ret = + citthread_create(&this_thread->tid, &this_thread->attr, + ctdl_internal_thread_func, + this_thread) != 0)) { end_critical_section(S_THREAD_LIST); - CtdlLogPrintf(CTDL_DEBUG, "Failed to start scheduled thread \"%s\": %s\n", this_thread->name, strerror(ret)); -// citthread_mutex_unlock(&this_thread->ThreadMutex); + CtdlLogPrintf(CTDL_DEBUG, + "Failed to start scheduled thread \"%s\": %s\n", + this_thread->name, strerror(ret)); +// citthread_mutex_unlock(&this_thread->ThreadMutex); citthread_mutex_destroy(&(this_thread->ThreadMutex)); citthread_cond_destroy(&(this_thread->ThreadCond)); citthread_mutex_destroy(&(this_thread->SleepMutex)); @@ -1036,22 +1092,22 @@ CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_threa free(this_thread); return NULL; } - - - num_threads++; // Increase the count of threads in the system. - if(this_thread->flags & CTDLTHREAD_WORKER) + + + num_threads++; // Increase the count of threads in the system. + if (this_thread->flags & CTDLTHREAD_WORKER) num_workers++; this_thread->next = CtdlThreadList; CtdlThreadList = this_thread; if (this_thread->next) this_thread->next->prev = this_thread; -// citthread_mutex_unlock(&that_thread->ThreadMutex); - +// citthread_mutex_unlock(&that_thread->ThreadMutex); + ctdl_thread_internal_calc_loadavg(); end_critical_section(S_THREAD_LIST); - - + + return this_thread; } @@ -1061,51 +1117,57 @@ void ctdl_thread_internal_check_scheduled(void) { CtdlThreadNode *this_thread, *that_thread; time_t now; - + if (try_critical_section(S_SCHEDULE_LIST)) - return; /* If this list is locked we wait till the next chance */ - + return; /* If this list is locked we wait till the next chance */ + now = time(NULL); - + #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_DEBUG, "Checking for scheduled threads to start.\n"); + CtdlLogPrintf(CTDL_DEBUG, + "Checking for scheduled threads to start.\n"); #endif this_thread = CtdlThreadSchedList; - while(this_thread) - { + while (this_thread) { that_thread = this_thread; this_thread = this_thread->next; - - if (now > that_thread->when) - { + + if (now > that_thread->when) { /* Unlink from schedule list */ if (that_thread->prev) - that_thread->prev->next = that_thread->next; + that_thread->prev->next = + that_thread->next; else CtdlThreadSchedList = that_thread->next; if (that_thread->next) - that_thread->next->prev = that_thread->prev; - + that_thread->next->prev = + that_thread->prev; + that_thread->next = that_thread->prev = NULL; #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_DEBUG, "About to start scheduled thread \"%s\".\n", that_thread->name); + CtdlLogPrintf(CTDL_DEBUG, + "About to start scheduled thread \"%s\".\n", + that_thread->name); #endif - if (CT->state > CTDL_THREAD_STOP_REQ) - { /* Only start it if the system is not stopping */ - if (ctdl_thread_internal_start_scheduled (that_thread)) - { + if (CT->state > CTDL_THREAD_STOP_REQ) { /* Only start it if the system is not stopping */ + if (ctdl_thread_internal_start_scheduled + (that_thread)) { #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_INFO, "Thread system, Started a scheduled thread \"%s\" (%ud).\n", - that_thread->name, that_thread->tid); + CtdlLogPrintf(CTDL_INFO, + "Thread system, Started a scheduled thread \"%s\" (%ud).\n", + that_thread->name, + that_thread->tid); #endif } } } #ifdef WITH_THREADLOG - else - { - CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n", that_thread->name, that_thread->when - time(NULL)); + else { + CtdlLogPrintf(CTDL_DEBUG, + "Thread \"%s\" will start in %ld seconds.\n", + that_thread->name, + that_thread->when - time(NULL)); } #endif } @@ -1116,10 +1178,11 @@ void ctdl_thread_internal_check_scheduled(void) /* * A warapper function for select so we can show a thread as blocked */ -int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) +int CtdlThreadSelect(int n, fd_set * readfds, fd_set * writefds, + fd_set * exceptfds, struct timeval *timeout) { int ret; - + ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED); ret = select(n, readfds, writefds, exceptfds, timeout); ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING); @@ -1129,15 +1192,17 @@ int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds void *new_worker_thread(void *arg); -extern void close_masters (void); - - +extern void close_masters(void); +void *select_on_master(void *args); +void *select_on_client(void *args); +CtdlThreadNode *client_select_thread; +CtdlThreadNode *master_select_thread; void go_threading(void) { int i; CtdlThreadNode *last_worker; - + /* * Initialise the thread system */ @@ -1145,11 +1210,24 @@ void go_threading(void) /* Second call to module init functions now that threading is up */ initialise_modules(1); + + + CtdlThreadCreate("House keeping", + CTDLTHREAD_BIGSTACK, + do_housekeeping, NULL); + + +#ifdef NEW_WORKER + master_select_thread = CtdlThreadCreate ("Select on Master", 0, select_on_master, NULL); + client_select_thread = CtdlThreadCreate ("Select on client", 0, select_on_client, NULL); +#endif /* * This thread is now used for garbage collection of other threads in the thread list */ - CtdlLogPrintf(CTDL_INFO, "Startup thread %d becoming garbage collector,\n", citthread_self()); + CtdlLogPrintf(CTDL_INFO, + "Startup thread %d becoming garbage collector,\n", + citthread_self()); /* * We do a lot of locking and unlocking of the thread list in here. @@ -1157,56 +1235,57 @@ void go_threading(void) * that may be waiting on the thread list. * We are a low priority thread so we can afford to do this */ - - while (CtdlThreadGetCount()) - { + + while (CtdlThreadGetCount()) { if (CT->signal) exit_signal = CT->signal; - if (exit_signal) - { + if (exit_signal) { CtdlThreadStopAll(); -// close_masters(); +// close_masters(); } check_sched_shutdown(); - if (CT->state > CTDL_THREAD_STOP_REQ) - { + if (CT->state > CTDL_THREAD_STOP_REQ) { begin_critical_section(S_THREAD_LIST); ctdl_thread_internal_calc_loadavg(); end_critical_section(S_THREAD_LIST); - - ctdl_thread_internal_check_scheduled(); /* start scheduled threads */ + + ctdl_thread_internal_check_scheduled(); /* start scheduled threads */ } - + /* Reduce the size of the worker thread pool if necessary. */ - if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ)) - { + if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) + && (CtdlThreadWorkerAvg < 20) + && (CT->state > CTDL_THREAD_STOP_REQ)) { /* Ask a worker thread to stop as we no longer need it */ begin_critical_section(S_THREAD_LIST); last_worker = CtdlThreadList; - while (last_worker) - { - citthread_mutex_lock(&last_worker->ThreadMutex); - if (last_worker->flags & CTDLTHREAD_WORKER && (last_worker->state > CTDL_THREAD_STOPPING) && (last_worker->Context == NULL)) - { - citthread_mutex_unlock(&last_worker->ThreadMutex); + while (last_worker) { + citthread_mutex_lock(&last_worker-> + ThreadMutex); + if (last_worker->flags & CTDLTHREAD_WORKER + && (last_worker->state > + CTDL_THREAD_STOPPING) + && (last_worker->Context == NULL)) { + citthread_mutex_unlock + (&last_worker->ThreadMutex); break; } - citthread_mutex_unlock(&last_worker->ThreadMutex); + citthread_mutex_unlock(&last_worker-> + ThreadMutex); last_worker = last_worker->next; } end_critical_section(S_THREAD_LIST); - if (last_worker) - { + if (last_worker) { #ifdef WITH_THREADLOG - CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (%ld).\n", - last_worker->name, - last_worker->tid - ); + CtdlLogPrintf(CTDL_DEBUG, + "Thread system, stopping excess worker thread \"%s\" (%ld).\n", + last_worker->name, + last_worker->tid); #endif CtdlThreadStop(last_worker); } } - + /* * If all our workers are working hard, start some more to help out * with things @@ -1215,36 +1294,40 @@ void go_threading(void) * based on the system load */ #ifdef NEW_WORKER - if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() <= num_sessions) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ)) + if ((((CtdlThreadGetWorkers() < config.c_max_workers) + && (CtdlThreadGetWorkers() <= num_sessions)) + || CtdlThreadGetWorkers() < config.c_min_workers) + && (CT->state > CTDL_THREAD_STOP_REQ)) #else - if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60) && (CtdlThreadGetLoadAvg() < 90) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ)) -#endif /* NEW_WORKER */ + if ((((CtdlThreadGetWorkers() < config.c_max_workers) + && (CtdlThreadGetWorkerAvg() > 60) + && (CtdlThreadGetLoadAvg() < 90)) + || CtdlThreadGetWorkers() < config.c_min_workers) + && (CT->state > CTDL_THREAD_STOP_REQ)) +#endif /* NEW_WORKER */ { - for (i=0; i<5 ; i++) - { + for (i = 0; i < 5; i++) { #ifdef NEW_WORKER CtdlThreadCreate("Worker Thread (new)", - CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, - new_worker_thread, - NULL - ); + CTDLTHREAD_BIGSTACK + + CTDLTHREAD_WORKER, + new_worker_thread, NULL); #else CtdlThreadCreate("Worker Thread", - CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, - worker_thread, - NULL - ); -#endif /* NEW_WORKER */ + CTDLTHREAD_BIGSTACK + + CTDLTHREAD_WORKER, + worker_thread, NULL); +#endif /* NEW_WORKER */ } } - + CtdlThreadGC(); - - if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector + + if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector { CtdlThreadGC(); } - + if (CtdlThreadGetCount()) CtdlThreadSleep(1); } @@ -1257,174 +1340,231 @@ void go_threading(void) + /* * Starting a new implimentation of a worker thread. * This new implimentation will be faster and do more work per thread. */ - -/* + +// TODO: need to sort out the thread states and signals +// TODO: slect_on_master should not be a big stack thread. +// TODO: slect_on_client should not be a big stack thread. +// TODO: select_on_master is not a worker thread and should be blocked when in select +// TODO: select_on_client is not a worker thread and should be blocked when in select +/** * Select on master socket. - * First worker thread in here acquires the lock and builds an FDSET of master sockets. - * then it goes into a loop selecting on the master sockets timing out every few milliseconds. - * If it times out it rebiulds its list and loops. - * If the select succeeds it creates a new context and returns. - * During this time the other workers are selecting on existing contexts or sleeping. + * One specific thread comes in here and never leaves. + * This thread blocks on select until something happens. + * The select only returns if a new connection is made or the select is interrupted by some means. + * We need to interrupt the select if the list of ServiceHook's changes or we are shutting down. + * We should probably use a signal to interrupt the select is a ServiceHook is created. + * When a ServiceHook is destroyed its socket will close which will awaken the select. */ -void select_on_master(void) +void *select_on_master(void *arg) { - fd_set readfds; - struct ServiceFunctionHook *serviceptr; - int ssock; /* Descriptor for client socket */ - int highest; - int m, i; - int retval = 0; - struct timeval tv; - struct CitContext *con; - const char *old_name; - - - - old_name = CtdlThreadName("select_on_master"); - - /* Initialize the fdset. */ - FD_ZERO(&readfds); - highest = 0; - - /* First, add the various master sockets to the fdset. */ - for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) { - m = serviceptr->msock; - FD_SET(m, &readfds); - if (m > highest) { - highest = m; - } - } - - tv.tv_sec = 1; /* wake up every 1 sec if no input */ - tv.tv_usec = 0; - retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv); - - /* Select got an error or we are shutting down so get out */ - if (retval == 0 || CtdlThreadCheckStop()) { - CtdlThreadName(old_name); - return; - } - - /* Select says something happened on one of our master sockets so now we handle it */ - for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) { - if (FD_ISSET(serviceptr->msock, &readfds)) { - ssock = accept(serviceptr->msock, NULL, 0); - if (ssock >= 0) { - CtdlLogPrintf(CTDL_DEBUG, "New client socket %d\n", ssock); - /* The master socket is non-blocking but the client - * sockets need to be blocking, otherwise certain - * operations barf on FreeBSD. Not a fatal error. - */ - if (fcntl(ssock, F_SETFL, 0) < 0) { - CtdlLogPrintf(CTDL_EMERG, - "citserver: Can't set socket to blocking: %s\n", - strerror(errno)); - } - - /* New context will be created already - * set up in the CON_EXECUTING state. - */ - con = CreateNewContext(); - CT->Context = con; - - /* Assign our new socket number to it. */ - con->client_socket = ssock; - con->h_command_function = serviceptr->h_command_function; - con->h_async_function = serviceptr->h_async_function; - con->ServiceName = serviceptr->ServiceName; - /* Determine whether it's a local socket */ - if (serviceptr->sockpath != NULL) - con->is_local_socket = 1; - - /* Set the SO_REUSEADDR socket option */ - i = 1; - setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); - - become_session(con); - begin_session(con); - serviceptr->h_greeting_function(); - become_session(NULL); - con->state = CON_IDLE; - break; - } - } - } - - CtdlThreadName(old_name); + fd_set readfds; + struct ServiceFunctionHook *serviceptr; + int ssock; /* Descriptor for client socket */ + int highest; + int m, i; + int retval = 0; + struct CitContext *con; + + + + while (!CtdlThreadCheckStop()) { + CtdlThreadName("select_on_master"); + + /* Initialize the fdset. */ + FD_ZERO(&readfds); + highest = 0; + + /* First, add the various master sockets to the fdset. */ + for (serviceptr = ServiceHookTable; serviceptr != NULL; + serviceptr = serviceptr->next) { + m = serviceptr->msock; + FD_SET(m, &readfds); + if (m > highest) { + highest = m; + } + } + + /** We can block indefinately since something will wake us up eventually + * Even if it is a signal telling us the system is shutting down + */ + retval = + CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, + NULL); + + /** Select got an error or we are shutting down so get out */ + if (retval == 0 || CtdlThreadCheckStop()) { + return NULL; + } + + /** Select says something happened on one of our master sockets so now we handle it */ + for (serviceptr = ServiceHookTable; serviceptr != NULL; + serviceptr = serviceptr->next) { + if (FD_ISSET(serviceptr->msock, &readfds)) { + ssock = accept(serviceptr->msock, NULL, 0); + if (ssock >= 0) { + CtdlLogPrintf(CTDL_DEBUG, + "New client socket %d\n", + ssock); + /* The master socket is non-blocking but the client + * sockets need to be blocking, otherwise certain + * operations barf on FreeBSD. Not a fatal error. + */ + if (fcntl(ssock, F_SETFL, 0) < 0) { + CtdlLogPrintf(CTDL_EMERG, + "citserver: Can't set socket to blocking: %s\n", + strerror + (errno)); + } + /* New context will be created already + * set up in the CON_EXECUTING state. + */ + con = CreateNewContext(); + /* Assign our new socket number to it. */ + con->client_socket = ssock; + con->h_command_function = + serviceptr->h_command_function; + con->h_async_function = + serviceptr->h_async_function; + con->ServiceName = + serviceptr->ServiceName; + con->h_greeting_function = serviceptr->h_greeting_function; + /* Determine whether it's a local socket */ + if (serviceptr->sockpath != NULL) + con->is_local_socket = 1; + + /* Set the SO_REUSEADDR socket option */ + i = 1; + setsockopt(ssock, SOL_SOCKET, + SO_REUSEADDR, &i, + sizeof(i)); + + /** Now we can pass this context to an idle worker thread to get things going + * What if there are no idle workers? + * We could create one but what if the thread list is full? + * Then I guess we need to close the socket a reject the connection. + */ + /** TODO: If there are no idle threads then this server is overloaded and we should reject the connection + * This will have the effect of throttling the incomming connections on master sockets + * a little and slow the process down. + */ +// if (idle_workers) + { + con->state = CON_START; + citthread_kill(client_select_thread->tid, SIGUSR1); + citthread_cond_signal(&worker_block); + } + // else + // output try later message + //start_context(con); + } + } + } + } + return NULL; } + + /* * Select on client socket. - * First worker thread in here acquires the lock and builds an FDSET of client sockets. - * then it selects on the client sockets timing out after 1 second. - * If it times out the thread goes off to check on housekeeping etc. - * If the select succeeds the thread goes off to handle the client request. - * If the list of client connections is empty the threads all sleep for one second + * Only one dedicated thread in here. + * We have to interrupt our select whenever a context is returned to the CON_READY state. + * as a result each context may be close to timing out its client so we have to calculate + * which client socket will timeout first and expire our select on that time. + * */ -struct CitContext *select_on_client(void) +void *select_on_client(void *arg) { fd_set readfds; - struct timeval tv; + struct timeval tv, now, result; int retval = 0; - int highest=0; - const char *old_name; - - - old_name = CtdlThreadName("select_on_client"); - - /* Initialise the fdset */ - FD_ZERO(&readfds); - FD_SET(CT->Context->client_socket, &readfds); - highest = CT->Context->client_socket; - /* Now we can select on any connections that are waiting */ - - if (!CtdlThreadCheckStop()) - { - tv.tv_sec = config.c_sleeping; /* wake up every second if no input */ + int highest; + struct CitContext *ptr; + + CtdlThreadName("select_on_client"); + + while (!CtdlThreadCheckStop()) { + /* Initialise the fdset */ + FD_ZERO(&readfds); + highest = 0; + /** Get the clients to select on */ + tv.tv_sec = config.c_sleeping; tv.tv_usec = 0; - retval = select(highest + 1, &readfds, NULL, NULL, &tv); - } - else /* Shutting down? */ - { - CtdlThreadName(old_name); - return(NULL); - } - + begin_critical_section(S_SESSION_TABLE); + for (ptr = ContextList; ptr != NULL; ptr = ptr->next) { + if (ptr->state == CON_IDLE) { + gettimeofday(&now, NULL); + timersub(&(ptr->client_expires_at), + &now, &result); + if (result.tv_sec <= 0) { + /** This client has timed out so kill it */ + ptr->kill_me = 1; + continue; + } + /** Is this one going to expire first? */ + timersub(&result, &tv, &now); + if (now.tv_sec <= 0 && now.tv_usec <= 0) { + tv.tv_sec = result.tv_sec; + tv.tv_usec = result.tv_usec; + } + FD_SET(ptr->client_socket, &readfds); + if (ptr->client_socket > highest) + highest = ptr->client_socket; + } + } + end_critical_section(S_SESSION_TABLE); - /* Now figure out who made this select() unblock. - * First, check for an error or exit condition. - */ - if (retval < 0) { - if (errno == EBADF) { - CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n", - strerror(errno)); + /* Now we can select on any connections that are waiting */ + if (!CtdlThreadCheckStop()) { + retval = + CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv); + } else { /* Shutting down? */ + + return NULL; } - if (errno != EINTR) { - CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno)); - CtdlThreadStopAll(); - } else if (!CtdlThreadCheckStop()) { - CtdlLogPrintf(CTDL_DEBUG, "Un handled select failure.\n"); + + + /* Now figure out who made this select() unblock. + * First, check for an error or exit condition. + */ + if (retval < 0) { + if (errno == EBADF) { + CtdlLogPrintf(CTDL_NOTICE, + "select() failed: (%s)\n", + strerror(errno)); + } + if (errno != EINTR) { + CtdlLogPrintf(CTDL_EMERG, + "Exiting (%s)\n", + strerror(errno)); + CtdlThreadStopAll(); + } else if (!CtdlThreadCheckStop()) { + CtdlLogPrintf(CTDL_DEBUG, + "Un handled select failure.\n"); + } + } else if (retval > 0) { + begin_critical_section(S_SESSION_TABLE); + for (ptr = ContextList; ptr != NULL; + ptr = ptr->next) { + if ((FD_ISSET + (ptr->client_socket, &readfds)) + && (ptr->state == CON_IDLE)) { + ptr->input_waiting = 1; + ptr->state = CON_READY; + /** reset client expire time */ + ptr->client_expires_at.tv_sec = config.c_sleeping; + ptr->client_expires_at.tv_usec = 0; + } + } + end_critical_section(S_SESSION_TABLE); } - CtdlThreadName(old_name); - return NULL; - } - else if(retval == 0) - { - CtdlThreadName(old_name); - CT->Context->kill_me = 1; - CT->Context = NULL; - return CT->Context; } - - CT->Context->state = CON_EXECUTING; - CT->Context->input_waiting = 1; - - CtdlThreadName(old_name); - return (CT->Context); + return NULL; } @@ -1435,7 +1575,7 @@ struct CitContext *select_on_client(void) int execute_session(struct CitContext *bind_me) { int force_purge; - + become_session(bind_me); /* If the client has sent a command, execute it. */ @@ -1447,11 +1587,11 @@ int execute_session(struct CitContext *bind_me) /* If there are asynchronous messages waiting and the * client supports it, do those now */ if ((CC->is_async) && (CC->async_waiting) - && (CC->h_async_function != NULL)) { + && (CC->h_async_function != NULL)) { CC->h_async_function(); CC->async_waiting = 0; } - + force_purge = CC->kill_me; if (force_purge) CT->Context = NULL; @@ -1467,12 +1607,12 @@ extern void dead_session_purge(int force); /* * A new worker_thread loop. */ - + void *new_worker_thread(void *arg) { - struct CitContext *bind_me; + struct CitContext *bind_me, *ptr; int force_purge; - + while (!CtdlThreadCheckStop()) { /* make doubly sure we're not holding any stale db handles @@ -1480,26 +1620,44 @@ void *new_worker_thread(void *arg) */ cdb_check_handles(); force_purge = 0; - bind_me = NULL; /* Which session shall we handle? */ - - if (CT->Context == NULL) - select_on_master(); - if (CtdlThreadCheckStop()) - break; - - if (CT->Context) - bind_me = select_on_client(); - if (CtdlThreadCheckStop()) - break; - - if (bind_me) - force_purge = execute_session(bind_me); - + bind_me = NULL; /* Which session shall we handle? */ + + begin_critical_section(S_SESSION_TABLE); + for (ptr = ContextList; ptr != NULL; ptr = ptr->next) { + if (ptr->state == CON_START) { + ptr->state = CON_EXECUTING; + end_critical_section(S_SESSION_TABLE); + become_session(ptr); + begin_session(ptr); + ptr->h_greeting_function(); + become_session(NULL); + ptr->state = CON_IDLE; + break; + } + if (ptr->state == CON_READY) { + ptr->state = CON_EXECUTING; + end_critical_section(S_SESSION_TABLE); + force_purge = execute_session(ptr); + break; + } + + } + end_critical_section(S_SESSION_TABLE); + dead_session_purge(force_purge); + + /** block the worker threads waiting for a select to do something */ + idle_workers++; + ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED); + citthread_mutex_lock(&worker_block_mutex); + citthread_cond_wait(&worker_block, &worker_block_mutex); + citthread_mutex_unlock(&worker_block_mutex); + ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING); + idle_workers--; + if (CtdlThreadCheckStop()) break; - - do_housekeeping(); + } return NULL; } -- 2.30.2