From 2c9048572f22002cd287cd3137b5f7461d35455b Mon Sep 17 00:00:00 2001 From: Art Cancro Date: Sun, 20 Mar 2011 14:00:45 -0400 Subject: [PATCH] Initial implementation of the worker thread pool size manager --- citadel/sysdep.c | 4 ++++ citadel/threads.c | 38 ++++++++++++-------------------------- citadel/threads.h | 3 +++ 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/citadel/sysdep.c b/citadel/sysdep.c index c3bbd7ad6..ee75bb9cb 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -1143,6 +1143,8 @@ void *worker_thread(void *blah) { struct timeval tv; int force_purge = 0; + ++num_workers; + while (!CtdlThreadCheckStop()) { /* make doubly sure we're not holding any stale db handles @@ -1259,6 +1261,7 @@ do_select: force_purge = 0; SKIP_SELECT: /* We're bound to a session */ + ++active_workers; if (bind_me != NULL) { become_session(bind_me); @@ -1292,6 +1295,7 @@ SKIP_SELECT: dead_session_purge(force_purge); do_housekeeping(); + --active_workers; } /* If control reaches this point, the server is shutting down */ return(NULL); diff --git a/citadel/threads.c b/citadel/threads.c index cb216febe..0d3caed9f 100644 --- a/citadel/threads.c +++ b/citadel/threads.c @@ -62,19 +62,8 @@ #include "event_client.h" -/* - * To create a thread you must call one of the create thread functions. - * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this - * struct CtdlThreadNode *node = NULL; - * pass in &node - * If the thread is created *node will point to the thread control structure for the created thread. - * If the thread creation fails *node remains NULL - * Do not free the memory pointed to by *node, it doesn't belong to you. - * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would - * remove the need for the calls to eCrashRegisterThread and friends - */ - -static int num_threads = 0; /* Current number of threads */ +int num_workers = 0; /* Current number of worker threads */ +int active_workers = 0; /* Number of ACTIVE worker threads */ pthread_key_t ThreadKey; pthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */ struct thread_tsd masterTSD; @@ -206,8 +195,6 @@ void CtdlThreadCreate(void *(*start_routine)(void*)) ret = pthread_attr_setstacksize(&attr, THREADSTACKSIZE); ret = pthread_create(&thread, &attr, CTC_backend, (void *)start_routine); if (ret != 0) syslog(LOG_EMERG, "pthread_create() : %s", strerror(errno)); - - ++num_threads; } @@ -231,20 +218,19 @@ void go_threading(void) CtdlThreadCreate(select_on_master); - /* FIXME temporary fixed size pool of worker threads */ - CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); + /* Begin with one worker thread. We will expand the pool if necessary */ CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); - CtdlThreadCreate(worker_thread); - - /* At this point I am a union worker and therefore serve no useful purpose. */ + /* The supervisor thread monitors worker threads and spawns more of them if it finds that + * they are all in use. FIXME make the 256 max threads a configurable value. + */ while(!CtdlThreadCheckStop()) { - sleep(3); + if ((active_workers == num_workers) && (num_workers < 256)) { + syslog(LOG_DEBUG, "worker threads: %d, active: %d\n", num_workers, active_workers); + CtdlThreadCreate(worker_thread); + syslog(LOG_DEBUG, "worker threads: %d, active: %d\n", num_workers, active_workers); + } + sleep(1); } /* Shut down */ diff --git a/citadel/threads.h b/citadel/threads.h index d080b81e7..f96ab77d5 100644 --- a/citadel/threads.h +++ b/citadel/threads.h @@ -33,6 +33,9 @@ struct thread_tsd { extern struct thread_tsd masterTSD; #define TSD MyThread() +extern int num_workers; +extern int active_workers; + struct thread_tsd *MyThread(void); int try_critical_section (int which_one); void begin_critical_section (int which_one); -- 2.30.2