From b6595aedb693cac23a2ce5781e13125fb7c70c96 Mon Sep 17 00:00:00 2001 From: Dave West Date: Sat, 1 Dec 2007 15:36:07 +0000 Subject: [PATCH] Try to rationalise the mutex lock/unlock sequence. Hunting for an occasional deadlock. --- citadel/include/ctdl_module.h | 2 + citadel/server_main.c | 28 +++--- citadel/sysdep.c | 178 ++++++++++++++++++++++++++-------- citadel/sysdep_decls.h | 4 +- 4 files changed, 152 insertions(+), 60 deletions(-) diff --git a/citadel/include/ctdl_module.h b/citadel/include/ctdl_module.h index bab177d69..5fbd9b147 100644 --- a/citadel/include/ctdl_module.h +++ b/citadel/include/ctdl_module.h @@ -117,6 +117,8 @@ char *CtdlThreadName(struct CtdlThreadNode *thread, char *name); struct CtdlThreadNode *CtdlThreadSelf(void); int CtdlThreadGetCount(void); int CtdlThreadGetWorkers(void); +double CtdlThreadGetWorkerAvg(void); +double CtdlThreadGetLoadAvg(void); void CtdlThreadGC(void); void CtdlThreadStopAll(void); int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timeval *timeout, struct CtdlThreadNode *self); diff --git a/citadel/server_main.c b/citadel/server_main.c index 3715bb356..8758de3b5 100644 --- a/citadel/server_main.c +++ b/citadel/server_main.c @@ -392,10 +392,14 @@ void go_threading(void) last_worker = CtdlThreadList; while (last_worker) { + pthread_mutex_lock(&last_worker->ThreadMutex); if (last_worker->flags & CTDLTHREAD_WORKER && last_worker->state > CTDL_THREAD_STOPPING) + { + pthread_mutex_unlock(&last_worker->ThreadMutex); break; - else - last_worker = last_worker->next; + } + pthread_mutex_unlock(&last_worker->ThreadMutex); + last_worker = last_worker->next; } end_critical_section(S_THREAD_LIST); if (last_worker) @@ -414,40 +418,32 @@ void go_threading(void) * If all our workers are working hard, start some more to help out * with things */ - begin_critical_section(S_THREAD_LIST); /* FIXME: come up with a better way to dynamically alter the number of threads * based on the system load */ // if ((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() < num_sessions)) // && (CtdlThreadLoadAvg < 90) ) - if ((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadWorkerAvg > 60) && (CtdlThreadLoadAvg < 90) ) + if ((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60) && (CtdlThreadGetLoadAvg() < 90) ) { - end_critical_section(S_THREAD_LIST); for (i=0; i<5 ; i++) // for (i=0; i< (num_sessions - CtdlThreadGetWorkers()) ; i++) // for (i=0; i< (10 - (55 - CtdlThreadWorkerAvg) / CtdlThreadWorkerAvg / CtdlThreadGetWorkers()) ; i++) { - begin_critical_section(S_THREAD_LIST); - ctdl_internal_create_thread("Worker Thread", +// begin_critical_section(S_THREAD_LIST); + CtdlThreadCreate("Worker Thread", CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, worker_thread, NULL ); - end_critical_section(S_THREAD_LIST); +// end_critical_section(S_THREAD_LIST); } } - else - end_critical_section(S_THREAD_LIST); - begin_critical_section(S_THREAD_LIST); - ctdl_internal_thread_gc(); - end_critical_section(S_THREAD_LIST); + CtdlThreadGC(); if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector { - begin_critical_section(S_THREAD_LIST); - ctdl_internal_thread_gc(); - end_critical_section(S_THREAD_LIST); + CtdlThreadGC(); } if (CtdlThreadGetCount()) diff --git a/citadel/sysdep.c b/citadel/sysdep.c index dc6b862b1..2428152ce 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -1114,6 +1114,11 @@ void ctdl_thread_internal_init(void) // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. memset (this_thread, 0, sizeof(struct CtdlThreadNode)); + pthread_mutex_init (&(this_thread->ThreadMutex), NULL); + pthread_cond_init (&(this_thread->ThreadCond), NULL); + pthread_mutex_init (&(this_thread->SleepMutex), NULL); + pthread_cond_init (&(this_thread->SleepCond), NULL); + /* We are garbage collector so create us as running */ this_thread->state = CTDL_THREAD_RUNNING; @@ -1125,9 +1130,6 @@ void ctdl_thread_internal_init(void) this_thread->name = strdup("Garbage Collection Thread"); - pthread_mutex_init (&(this_thread->ThreadMutex), NULL); - pthread_cond_init (&(this_thread->ThreadCond), NULL); - this_thread->tid = GC_thread; num_threads++; // Increase the count of threads in the system. @@ -1152,6 +1154,7 @@ void ctdl_thread_internal_init(void) gettimeofday(&now, NULL); timersub(&now, &(this_thread->last_state_change), &result); + pthread_mutex_lock(&this_thread->ThreadMutex); // result now has a timeval for the time we spent in the last state since we last updated last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000); if (this_thread->state == CTDL_THREAD_SLEEPING) @@ -1161,6 +1164,7 @@ void ctdl_thread_internal_init(void) if (this_thread->state == CTDL_THREAD_BLOCKED) this_thread->avg_blocked += last_duration; memcpy (&this_thread->last_state_change, &now, sizeof (struct timeval)); + pthread_mutex_unlock(&this_thread->ThreadMutex); } /* @@ -1171,8 +1175,8 @@ void ctdl_thread_internal_change_state (struct CtdlThreadNode *this_thread, enum /* * Wether we change state or not we need update the load values */ - pthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */ ctdl_thread_internal_update_avgs(this_thread); + pthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */ if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ)) this_thread->state = new_state; if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING)) @@ -1202,6 +1206,7 @@ void CtdlThreadStopAll(void) ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); // pthread_mutex_lock(&this_thread->ThreadMutex); pthread_cond_signal(&this_thread->ThreadCond); + pthread_cond_signal(&this_thread->SleepCond); // pthread_mutex_unlock(&this_thread->ThreadMutex); CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (%ld).\n", this_thread->name, this_thread->tid); } @@ -1212,13 +1217,13 @@ void CtdlThreadStopAll(void) /* - * A function to signal that we need to do garbage collection on the thread list + * A function to wake up all sleeping threads */ -void CtdlThreadGC(void) +void CtdlThreadWakeAll(void) { struct CtdlThreadNode *this_thread; - CtdlLogPrintf(CTDL_DEBUG, "Thread system signalling garbage collection.\n"); + CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n"); begin_critical_section(S_THREAD_LIST); this_thread = CtdlThreadList; @@ -1228,6 +1233,7 @@ void CtdlThreadGC(void) { // pthread_mutex_lock(&this_thread->ThreadMutex); pthread_cond_signal(&this_thread->ThreadCond); + pthread_cond_signal(&this_thread->SleepCond); // pthread_mutex_unlock(&this_thread->ThreadMutex); } this_thread = this_thread->next; @@ -1241,12 +1247,42 @@ void CtdlThreadGC(void) */ int CtdlThreadGetCount(void) { - return num_threads; + int ret; + + begin_critical_section(S_THREAD_LIST); + ret = num_threads; + end_critical_section(S_THREAD_LIST); + return ret; } int CtdlThreadGetWorkers(void) { - return num_workers; + int ret; + + begin_critical_section(S_THREAD_LIST); + ret = num_workers; + end_critical_section(S_THREAD_LIST); + return ret; +} + +double CtdlThreadGetWorkerAvg(void) +{ + double ret; + + begin_critical_section(S_THREAD_LIST); + ret = CtdlThreadWorkerAvg; + end_critical_section(S_THREAD_LIST); + return ret; +} + +double CtdlThreadGetLoadAvg(void) +{ + double ret; + + begin_critical_section(S_THREAD_LIST); + ret = CtdlThreadLoadAvg; + end_critical_section(S_THREAD_LIST); + return ret; } /* @@ -1263,11 +1299,14 @@ struct CtdlThreadNode *CtdlThreadSelf(void) this_thread = CtdlThreadList; while(this_thread) { + pthread_mutex_lock(&this_thread->ThreadMutex); if (pthread_equal(self_tid, this_thread->tid)) { + pthread_mutex_unlock(&this_thread->ThreadMutex); end_critical_section(S_THREAD_LIST); return this_thread; } + pthread_mutex_unlock(&this_thread->ThreadMutex); this_thread = this_thread->next; } end_critical_section(S_THREAD_LIST); @@ -1295,13 +1334,15 @@ char *CtdlThreadName(struct CtdlThreadNode *thread, char *name) CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name); return NULL; } - begin_critical_section(S_THREAD_LIST); +// begin_critical_section(S_THREAD_LIST); + pthread_mutex_lock(&this_thread->ThreadMutex); old_name = this_thread->name; if (name) this_thread->name = strdup (name); else old_name = strdup(old_name); - end_critical_section (S_THREAD_LIST); + pthread_mutex_unlock(&this_thread->ThreadMutex); +// end_critical_section (S_THREAD_LIST); return (old_name); } @@ -1331,10 +1372,10 @@ void CtdlThreadCancel(struct CtdlThreadNode *thread) return; } - begin_critical_section(S_THREAD_LIST); +// begin_critical_section(S_THREAD_LIST); ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED); pthread_cancel(this_thread->tid); - end_critical_section (S_THREAD_LIST); +// end_critical_section (S_THREAD_LIST); } @@ -1384,12 +1425,13 @@ void CtdlThreadStop(struct CtdlThreadNode *thread) if (!(this_thread->thread_func)) return; // Don't stop garbage collector - begin_critical_section (S_THREAD_LIST); +// begin_critical_section (S_THREAD_LIST); ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); // pthread_mutex_lock(&this_thread->ThreadMutex); pthread_cond_signal(&this_thread->ThreadCond); + pthread_cond_signal(&this_thread->SleepCond); // pthread_mutex_unlock(&this_thread->ThreadMutex); - end_critical_section(S_THREAD_LIST); +// end_critical_section(S_THREAD_LIST); } /* @@ -1409,20 +1451,24 @@ void CtdlThreadSleep(int secs) return; } - begin_critical_section(S_THREAD_LIST); - ctdl_thread_internal_change_state (self, CTDL_THREAD_SLEEPING); - pthread_mutex_lock(&self->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */ - end_critical_section(S_THREAD_LIST); - memset (&wake_time, 0, sizeof(struct timespec)); gettimeofday(&time_now, NULL); wake_time.tv_sec = time_now.tv_sec + secs; wake_time.tv_nsec = time_now.tv_usec * 10; - pthread_cond_timedwait(&self->ThreadCond, &self->ThreadMutex, &wake_time); - begin_critical_section(S_THREAD_LIST); + +// begin_critical_section(S_THREAD_LIST); + ctdl_thread_internal_change_state (self, CTDL_THREAD_SLEEPING); +// end_critical_section(S_THREAD_LIST); + +// pthread_mutex_lock(&self->SleepMutex); /* Prevent something asking us to awaken before we've gone to sleep */ + pthread_mutex_lock(&self->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */ + pthread_cond_timedwait(&self->SleepCond, &self->ThreadMutex, &wake_time); pthread_mutex_unlock(&self->ThreadMutex); +// pthread_mutex_unlock(&self->SleepMutex); + +// begin_critical_section(S_THREAD_LIST); ctdl_thread_internal_change_state (self, CTDL_THREAD_RUNNING); - end_critical_section(S_THREAD_LIST); +// end_critical_section(S_THREAD_LIST); } @@ -1438,14 +1484,14 @@ static void ctdl_internal_thread_cleanup(void *arg) * NB. WE ARE THE CURRENT THREAD */ CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", this_thread->name, this_thread->tid); - begin_critical_section(S_THREAD_LIST); +// begin_critical_section(S_THREAD_LIST); #ifdef HAVE_BACKTRACE eCrash_UnregisterThread(); #endif pthread_mutex_lock(&this_thread->ThreadMutex); this_thread->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early pthread_mutex_unlock(&this_thread->ThreadMutex); - end_critical_section(S_THREAD_LIST); +// end_critical_section(S_THREAD_LIST); // CtdlThreadGC(); } @@ -1464,8 +1510,8 @@ void ctdl_thread_internal_calc_loadavg(void) while(that_thread) { /* Update load averages */ - pthread_mutex_lock(&that_thread->ThreadMutex); ctdl_thread_internal_update_avgs(that_thread); + pthread_mutex_lock(&that_thread->ThreadMutex); that_thread->load_avg = that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked; that_thread->load_avg = that_thread->avg_running / that_thread->load_avg * 100; that_thread->avg_sleeping /= 2; @@ -1502,11 +1548,13 @@ void ctdl_thread_internal_calc_loadavg(void) * Garbage collection routine. * Gets called by main() in a loop to clean up the thread list periodically. */ -void ctdl_internal_thread_gc (void) +void CtdlThreadGC (void) { struct CtdlThreadNode *this_thread, *that_thread; int workers = 0; + begin_critical_section(S_THREAD_LIST); + /* Handle exiting of garbage collector thread */ if(num_threads == 1) CtdlThreadList->state = CTDL_THREAD_EXITED; @@ -1524,15 +1572,19 @@ void ctdl_internal_thread_gc (void) this_thread = this_thread->next; /* Do we need to clean up this thread? */ + pthread_mutex_lock(&that_thread->ThreadMutex); if (that_thread->state != CTDL_THREAD_EXITED) { if(that_thread->flags & CTDLTHREAD_WORKER) workers++; /* Sanity check on number of worker threads */ + pthread_mutex_unlock(&that_thread->ThreadMutex); continue; } if (pthread_equal(that_thread->tid, pthread_self()) && that_thread->thread_func) { /* Sanity check */ + pthread_mutex_unlock(&that_thread->ThreadMutex); + end_critical_section(S_THREAD_LIST); CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n"); abort(); return; @@ -1540,11 +1592,16 @@ void ctdl_internal_thread_gc (void) if (num_threads <= 0) { /* Sanity check */ + pthread_mutex_unlock(&that_thread->ThreadMutex); + end_critical_section(S_THREAD_LIST); CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n"); abort(); return; } + if(that_thread->flags & CTDLTHREAD_WORKER) + num_workers--; /* This is a wroker thread so reduce the count. */ + num_threads--; /* If we are unlinking the list head then the next becomes the list head */ if (that_thread == CtdlThreadList) CtdlThreadList = that_thread->next; @@ -1552,10 +1609,12 @@ void ctdl_internal_thread_gc (void) that_thread->prev->next = that_thread->next; if(that_thread->next) that_thread->next->prev = that_thread->next; - num_threads--; - if(that_thread->flags & CTDLTHREAD_WORKER) - num_workers--; /* This is a wroker thread so reduce the count. */ + pthread_mutex_unlock(&that_thread->ThreadMutex); + pthread_cond_signal(&that_thread->ThreadCond); + pthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake + pthread_mutex_lock(&that_thread->ThreadMutex); // Make sure it has done what its doing + pthread_mutex_unlock(&that_thread->ThreadMutex); /* * Join on the thread to do clean up and prevent memory leaks * Also makes sure the thread has cleaned up after itself before we remove it from the list @@ -1572,6 +1631,8 @@ void ctdl_internal_thread_gc (void) free(that_thread->name); pthread_mutex_destroy(&that_thread->ThreadMutex); pthread_cond_destroy(&that_thread->ThreadCond); + pthread_mutex_destroy(&that_thread->SleepMutex); + pthread_cond_destroy(&that_thread->SleepCond); pthread_attr_destroy(&that_thread->attr); free(that_thread); } @@ -1579,12 +1640,14 @@ void ctdl_internal_thread_gc (void) /* Sanity check number of worker threads */ if (workers != num_workers) { + end_critical_section(S_THREAD_LIST); CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n", workers, num_workers ); abort(); } + end_critical_section(S_THREAD_LIST); } @@ -1605,22 +1668,34 @@ static void *ctdl_internal_thread_func (void *arg) * can continue its execution. */ begin_critical_section(S_THREAD_LIST); + this_thread = (struct CtdlThreadNode *) arg; + gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ + pthread_mutex_lock(&this_thread->ThreadMutex); + // Register the cleanup function to take care of when we exit. pthread_cleanup_push(ctdl_internal_thread_cleanup, NULL); // Get our thread data structure - this_thread = (struct CtdlThreadNode *) arg; + this_thread->pid = getpid(); + memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ /* Only change to running state if we weren't asked to stop during the create cycle * Other wise there is a window to allow this threads creation to continue to full grown and * therby prevent a shutdown of the server. */ + pthread_mutex_unlock(&this_thread->ThreadMutex); + if (!CtdlThreadCheckStop(this_thread)) + { + pthread_mutex_lock(&this_thread->ThreadMutex); this_thread->state = CTDL_THREAD_RUNNING; - - this_thread->pid = getpid(); - gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ - memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ + pthread_mutex_unlock(&this_thread->ThreadMutex); + } end_critical_section(S_THREAD_LIST); - + + // Register for tracing + #ifdef HAVE_BACKTRACE + eCrash_RegisterThread(this_thread->name, 0); + #endif + // Tell the world we are here CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", this_thread->name, this_thread->tid); @@ -1668,9 +1743,22 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. memset (this_thread, 0, sizeof(struct CtdlThreadNode)); + /* Create the mutex's early so we can use them */ + pthread_mutex_init (&(this_thread->ThreadMutex), NULL); + pthread_cond_init (&(this_thread->ThreadCond), NULL); + pthread_mutex_init (&(this_thread->SleepMutex), NULL); + pthread_cond_init (&(this_thread->SleepCond), NULL); + + pthread_mutex_lock(&this_thread->ThreadMutex); + this_thread->state = CTDL_THREAD_CREATE; if ((ret = pthread_attr_init(&this_thread->attr))) { + pthread_mutex_unlock(&this_thread->ThreadMutex); + pthread_mutex_destroy(&(this_thread->ThreadMutex)); + pthread_cond_destroy(&(this_thread->ThreadCond)); + pthread_mutex_destroy(&(this_thread->SleepMutex)); + pthread_cond_destroy(&(this_thread->SleepCond)); CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret)); free(this_thread); return NULL; @@ -1684,9 +1772,14 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void { CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n"); if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) { + pthread_mutex_unlock(&this_thread->ThreadMutex); + pthread_mutex_destroy(&(this_thread->ThreadMutex)); + pthread_cond_destroy(&(this_thread->ThreadCond)); + pthread_mutex_destroy(&(this_thread->SleepMutex)); + pthread_cond_destroy(&(this_thread->SleepCond)); + pthread_attr_destroy(&this_thread->attr); CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n", strerror(ret)); - pthread_attr_destroy(&this_thread->attr); free(this_thread); return NULL; } @@ -1714,8 +1807,6 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void * because the creation didn't affect the load average. */ this_thread->avg_blocked = 2; - pthread_mutex_init (&(this_thread->ThreadMutex), NULL); - pthread_cond_init (&(this_thread->ThreadCond), NULL); /* * We want to make sure that only the main thread handles signals, @@ -1745,6 +1836,8 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void free (this_thread->name); pthread_mutex_destroy(&(this_thread->ThreadMutex)); pthread_cond_destroy(&(this_thread->ThreadCond)); + pthread_mutex_destroy(&(this_thread->SleepMutex)); + pthread_cond_destroy(&(this_thread->SleepCond)); pthread_attr_destroy(&this_thread->attr); free(this_thread); if (sigtrick) @@ -1763,10 +1856,9 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void CtdlThreadList = this_thread; if (this_thread->next) this_thread->next->prev = this_thread; - // Register for tracing - #ifdef HAVE_BACKTRACE - eCrash_RegisterThread(this_thread->name, 0); - #endif + + pthread_mutex_unlock(&this_thread->ThreadMutex); + ctdl_thread_internal_calc_loadavg(); return this_thread; } diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index 023ac6433..a16a2e603 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -136,7 +136,9 @@ extern struct CtdlThreadNode { long flags; /* Flags that describe this thread */ enum CtdlThreadState state; /* Flag to show state of this thread */ pthread_mutex_t ThreadMutex; /* A mutex to sync this thread to others if this thread allows (also used for sleeping) */ - pthread_cond_t ThreadCond; /* A condition variable to sync this thread with others (also used for sleeping) */ + pthread_cond_t ThreadCond; /* A condition variable to sync this thread with others */ + pthread_mutex_t SleepMutex; /* A mutex for sleeping */ + pthread_cond_t SleepCond; /* A condition variable for sleeping */ pthread_attr_t attr; /* Attributes of this thread */ struct timeval start_time; /* Time this thread was started */ struct timeval last_state_change; /* Time when this thread last changed state */ -- 2.30.2