From 8c538eeb2d9b024970c350730b4748b870762c5f Mon Sep 17 00:00:00 2001 From: Dave West Date: Thu, 6 Dec 2007 22:44:34 +0000 Subject: [PATCH] Added the functions to allow scheduling of a thread to start at some time in the future. Fixed a bug in garbage collection that caused a corrupted thread list when unlinking dead threads. Fixed a bug in context_cleanup, the ContextList needed to be updated on each pass since a module may try to iterate the list as a result of the event triggerred here. --- citadel/include/ctdl_module.h | 1 + citadel/server.h | 1 + citadel/server_main.c | 2 + citadel/sysdep.c | 296 ++++++++++++++++++++++++---------- citadel/sysdep_decls.h | 2 + 5 files changed, 220 insertions(+), 82 deletions(-) diff --git a/citadel/include/ctdl_module.h b/citadel/include/ctdl_module.h index b0508a96b..932e0fe38 100644 --- a/citadel/include/ctdl_module.h +++ b/citadel/include/ctdl_module.h @@ -113,6 +113,7 @@ void CtdlModuleStartCryptoMsgs(char *ok_response, char *nosup_response, char *er * Citadel Threads API */ struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args); +struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when); void CtdlThreadSleep(int secs); void CtdlThreadStop(struct CtdlThreadNode *thread); int CtdlThreadCheckStop(void); diff --git a/citadel/server.h b/citadel/server.h index d77726a44..01e513554 100644 --- a/citadel/server.h +++ b/citadel/server.h @@ -240,6 +240,7 @@ enum { S_NETSPOOL, S_THREAD_LIST, S_XMPP_QUEUE, + S_SCHEDULE_LIST, MAX_SEMAPHORES }; diff --git a/citadel/server_main.c b/citadel/server_main.c index c46c0857a..363c058cd 100644 --- a/citadel/server_main.c +++ b/citadel/server_main.c @@ -388,6 +388,8 @@ void go_threading(void) begin_critical_section(S_THREAD_LIST); ctdl_thread_internal_calc_loadavg(); end_critical_section(S_THREAD_LIST); + + ctdl_thread_internal_check_scheduled(); /* start scheduled threads */ } /* Reduce the size of the worker thread pool if necessary. */ diff --git a/citadel/sysdep.c b/citadel/sysdep.c index cf12a75cd..17e40ce57 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -789,6 +789,11 @@ void context_cleanup(void) * There are no threads so no critical_section stuff is needed. */ ptr = ContextList; + + /* We need to update the ContextList because some modules may want to itterate it + * Question is should we NULL it before iterating here or should we just keep updating it + * as we remove items? + */ while (ptr != NULL){ /* Remove the session from the active list */ rem = ptr->next; @@ -798,8 +803,8 @@ void context_cleanup(void) RemoveContext(ptr); free (ptr); ptr = rem; + ContextList = rem; // Update ContextList since a module may try to iterate the list. } - } @@ -1009,6 +1014,7 @@ int convert_login(char NameToConvert[]) { struct CtdlThreadNode *CtdlThreadList = NULL; +struct CtdlThreadNode *CtdlThreadSchedList = NULL; /* * Condition variable and Mutex for thread garbage collection @@ -1022,63 +1028,6 @@ double CtdlThreadLoadAvg = 0; double CtdlThreadWorkerAvg = 0; pthread_key_t ThreadKey; -/* - * Pinched the following bits regarding signals from Kannel.org - */ - -/* - * Change this thread's signal mask to block user-visible signals - * (HUP, TERM, QUIT, INT), and store the old signal mask in - * *old_set_storage. - * Return 0 for success, or -1 if an error occurred. - */ - - /* - * This does not work in Darwin alias MacOS X alias Mach kernel, - * however. So we define a dummy function doing nothing. - */ -#if defined(DARWIN_OLD) - static int pthread_sigmask(); -#endif - -static int ctdl_thread_internal_block_signals(sigset_t *old_set_storage) -{ - int ret; - sigset_t block_signals; - - ret = sigemptyset(&block_signals); - if (ret != 0) { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Couldn't initialize signal set\n"); - return -1; - } - ret = sigaddset(&block_signals, SIGHUP); - ret |= sigaddset(&block_signals, SIGTERM); - ret |= sigaddset(&block_signals, SIGQUIT); - ret |= sigaddset(&block_signals, SIGINT); - if (ret != 0) { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Couldn't add signal to signal set.\n"); - return -1; - } - ret = pthread_sigmask(SIG_BLOCK, &block_signals, old_set_storage); - if (ret != 0) { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Couldn't disable signals for thread creation\n"); - return -1; - } - return 0; -} - -static void ctdl_thread_internal_restore_signals(sigset_t *old_set) -{ - int ret; - - ret = pthread_sigmask(SIG_SETMASK, old_set, NULL); - if (ret != 0) { - CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Couldn't restore signal set.\n"); - } -} - - - /* * A function to destroy the TSD */ @@ -1139,11 +1088,26 @@ void ctdl_thread_internal_free_tsd(void) void ctdl_thread_internal_cleanup(void) { int i; + struct CtdlThreadNode *this_thread, *that_thread; for (i=0; inext; + pthread_mutex_destroy(&that_thread->ThreadMutex); + pthread_cond_destroy(&that_thread->ThreadCond); + pthread_mutex_destroy(&that_thread->SleepMutex); + pthread_cond_destroy(&that_thread->SleepCond); + pthread_attr_destroy(&that_thread->attr); + free(that_thread); + } ctdl_thread_internal_free_tsd(); } @@ -1602,12 +1566,12 @@ void CtdlThreadGC (void) num_workers--; /* This is a wroker thread so reduce the count. */ num_threads--; /* If we are unlinking the list head then the next becomes the list head */ - if (that_thread == CtdlThreadList) - CtdlThreadList = that_thread->next; if(that_thread->prev) that_thread->prev->next = that_thread->next; + else + CtdlThreadList = that_thread->next; if(that_thread->next) - that_thread->next->prev = that_thread->next; + that_thread->next->prev = that_thread->prev; pthread_mutex_unlock(&that_thread->ThreadMutex); pthread_cond_signal(&that_thread->ThreadCond); @@ -1725,8 +1689,6 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void { int ret = 0; struct CtdlThreadNode *this_thread; - int sigtrick = 0; - sigset_t old_signal_set; if (num_threads >= 32767) { @@ -1807,20 +1769,6 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void */ this_thread->avg_blocked = 2; - /* - * We want to make sure that only the main thread handles signals, - * so that each signal is handled exactly once. To do this, we - * make sure that each new thread has all the signals that we - * handle blocked. To avoid race conditions, we block them in - * the spawning thread first, then create the new thread (which - * inherits the settings), and then restore the old settings in - * the spawning thread. This means that there is a brief period - * when no signals will be processed, but during that time they - * should be queued by the operating system. - */ -// if (pthread_equal(GC_thread, pthread_self())) -// sigtrick = ctdl_thread_internal_block_signals(&old_signal_set) == 0; - /* * We pass this_thread into the thread as its args so that it can find out information * about itself and it has a bit of storage space for itself, not to mention that the REAL @@ -1831,20 +1779,16 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n", strerror(ret)); + pthread_mutex_unlock(&this_thread->ThreadMutex); pthread_mutex_destroy(&(this_thread->ThreadMutex)); pthread_cond_destroy(&(this_thread->ThreadCond)); pthread_mutex_destroy(&(this_thread->SleepMutex)); pthread_cond_destroy(&(this_thread->SleepCond)); pthread_attr_destroy(&this_thread->attr); free(this_thread); -// if (sigtrick) -// ctdl_thread_internal_restore_signals(&old_signal_set); return NULL; } -// if (sigtrick) -// ctdl_thread_internal_restore_signals(&old_signal_set); - num_threads++; // Increase the count of threads in the system. if(this_thread->flags & CTDLTHREAD_WORKER) num_workers++; @@ -1878,6 +1822,194 @@ struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_f +/* + * Internal function to schedule a thread. + * Must be called from within a S_THREAD_LIST critical section + */ +struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when) +{ + int ret = 0; + struct CtdlThreadNode *this_thread; + + if (num_threads >= 32767) + { + CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n"); + return NULL; + } + + this_thread = malloc(sizeof(struct CtdlThreadNode)); + if (this_thread == NULL) { + CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); + return NULL; + } + // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. + memset (this_thread, 0, sizeof(struct CtdlThreadNode)); + + /* Create the mutex's early so we can use them */ + pthread_mutex_init (&(this_thread->ThreadMutex), NULL); + pthread_cond_init (&(this_thread->ThreadCond), NULL); + pthread_mutex_init (&(this_thread->SleepMutex), NULL); + pthread_cond_init (&(this_thread->SleepCond), NULL); + + this_thread->state = CTDL_THREAD_CREATE; + + if ((ret = pthread_attr_init(&this_thread->attr))) { + pthread_mutex_destroy(&(this_thread->ThreadMutex)); + pthread_cond_destroy(&(this_thread->ThreadCond)); + pthread_mutex_destroy(&(this_thread->SleepMutex)); + pthread_cond_destroy(&(this_thread->SleepCond)); + CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret)); + free(this_thread); + return NULL; + } + + /* Our per-thread stacks need to be bigger than the default size, + * otherwise the MIME parser crashes on FreeBSD, and the IMAP service + * crashes on 64-bit Linux. + */ + if (flags & CTDLTHREAD_BIGSTACK) + { + CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n"); + if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) { + pthread_mutex_destroy(&(this_thread->ThreadMutex)); + pthread_cond_destroy(&(this_thread->ThreadCond)); + pthread_mutex_destroy(&(this_thread->SleepMutex)); + pthread_cond_destroy(&(this_thread->SleepCond)); + pthread_attr_destroy(&this_thread->attr); + CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n", + strerror(ret)); + free(this_thread); + return NULL; + } + } + + /* + * If we got here we are going to create the thread so we must initilise the structure + * first because most implimentations of threading can't create it in a stopped state + * and it might want to do things with its structure that aren't initialised otherwise. + */ + if(name) + { + this_thread->name = name; + } + else + { + this_thread->name = "Un-named Thread"; + } + + this_thread->flags = flags; + this_thread->thread_func = thread_func; + this_thread->user_args = args; + /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the + * load average for the system. If we don't do this then we create a mass of threads at the same time + * because the creation didn't affect the load average. + */ + this_thread->avg_blocked = 2; + + /* + * When to start this thread + */ + this_thread->when = when; + + begin_critical_section(S_SCHEDULE_LIST); + this_thread->next = CtdlThreadSchedList; + CtdlThreadSchedList = this_thread; + if (this_thread->next) + this_thread->next->prev = this_thread; + end_critical_section(S_SCHEDULE_LIST); + + return this_thread; +} + + + +struct CtdlThreadNode *ctdl_thread_internal_start_scheduled (struct CtdlThreadNode *this_thread) +{ + int ret = 0; + + /* + * We pass this_thread into the thread as its args so that it can find out information + * about itself and it has a bit of storage space for itself, not to mention that the REAL + * thread function needs to finish off the setup of the structure + */ + if ((ret = pthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0)) + { + + CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n", + strerror(ret)); + return NULL; + } + + + num_threads++; // Increase the count of threads in the system. + if(this_thread->flags & CTDLTHREAD_WORKER) + num_workers++; + + this_thread->next = CtdlThreadList; + CtdlThreadList = this_thread; + if (this_thread->next) + this_thread->next->prev = this_thread; + + return this_thread; +} + + + +void ctdl_thread_internal_check_scheduled(void) +{ + struct CtdlThreadNode *this_thread, *that_thread; + time_t now; + + if (try_critical_section(S_SCHEDULE_LIST)) + return; /* If this list is locked we wait till the next chance */ + + now = time(NULL); + + this_thread = CtdlThreadSchedList; + while(this_thread) + { + that_thread = this_thread; + this_thread = this_thread->next; + + if (now > that_thread->when) + { + /* Unlink from schedule list */ + if (that_thread->next) + that_thread->next->prev = that_thread->prev; + if (that_thread->prev) + that_thread->prev->next = that_thread->next; + else + CtdlThreadSchedList = that_thread->next; + + begin_critical_section(S_THREAD_LIST); + if (CT->state > CTDL_THREAD_STOP_REQ) + { /* Only start it if the system is not stopping */ + pthread_mutex_lock(&that_thread->ThreadMutex); + if (ctdl_thread_internal_start_scheduled (that_thread) == NULL) + { + pthread_mutex_unlock(&that_thread->ThreadMutex); + pthread_mutex_destroy(&(that_thread->ThreadMutex)); + pthread_cond_destroy(&(that_thread->ThreadCond)); + pthread_mutex_destroy(&(that_thread->SleepMutex)); + pthread_cond_destroy(&(that_thread->SleepCond)); + pthread_attr_destroy(&that_thread->attr); + free(that_thread); + } + else + { + CtdlLogPrintf(CTDL_INFO, "Thread system, Started a sceduled thread \"%s\".\n", + that_thread->name); + pthread_mutex_unlock(&that_thread->ThreadMutex); + ctdl_thread_internal_calc_loadavg(); + } + } + end_critical_section(S_THREAD_LIST); + } + } + end_critical_section(S_SCHEDULE_LIST); +} + + /* * A warapper function for select so we can show a thread as blocked */ diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index f2480f47d..8c7e3059b 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -136,6 +136,7 @@ enum CtdlThreadState { extern struct CtdlThreadNode { pthread_t tid; /* id as returned by pthread_create() */ pid_t pid; /* pid, as best the OS will let us determine */ + time_t when; /* When to start a scheduled thread */ struct CitConext *Context; /* The session context that this thread mught be working on or NULL if none */ long number; /* A unigue number for this thread (not implimented yet) */ int wakefd_recv; /* An fd that this thread can sleep on (not implimented yet) */ @@ -177,6 +178,7 @@ void ctdl_thread_internal_cleanup(void); void ctdl_thread_internal_calc_loadavg(void); void ctdl_thread_internal_free_tsd(void); struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args); +void ctdl_thread_internal_check_scheduled(void); -- 2.30.2