Removed a few unneccesary lock calls to get a small speed up.
authorDave West <davew@uncensored.citadel.org>
Sat, 29 Dec 2007 20:10:06 +0000 (20:10 +0000)
committerDave West <davew@uncensored.citadel.org>
Sat, 29 Dec 2007 20:10:06 +0000 (20:10 +0000)
Changed the calc for load average. Now for the purposes of load averages
sleeping is the same as running since it means the thread can't do
something else in the mean time.

Started a new worker thread style for experimental purposes turn on by
un-commenting the #define NEW_WORKER at the top of threads.c

citadel/threads.c
citadel/threads.h

index 215dc7bb483fafbd17ab44408dfc3b68986fd418..caf91b45a07f2afa8c25db089aa77f5d02524040 100644 (file)
@@ -9,12 +9,33 @@
  */
 
 #include <errno.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+#  include <sys/time.h>
+# else
+#  include <time.h>
+# endif
+#endif
+
 #include "threads.h"
 #include "ctdl_module.h"
 #include "modules_init.h"
 #include "housekeeping.h"
 #include "config.h"
+#include "citserver.h"
+#include "sysdep_decls.h"
+
+/*
+ * define this to use the new worker_thread method of handling connections
+ */
+//#define NEW_WORKER
 
 /*
  * New thread interface.
@@ -32,8 +53,8 @@
 static int num_threads = 0;                    /* Current number of threads */
 static int num_workers = 0;                    /* Current number of worker threads */
 
-struct CtdlThreadNode *CtdlThreadList = NULL;
-struct CtdlThreadNode *CtdlThreadSchedList = NULL;
+CtdlThreadNode *CtdlThreadList = NULL;
+CtdlThreadNode *CtdlThreadSchedList = NULL;
 
 /*
  * Condition variable and Mutex for thread garbage collection
@@ -50,6 +71,11 @@ pthread_key_t ThreadKey;
 pthread_mutex_t Critters[MAX_SEMAPHORES];      /* Things needing locking */
 
 
+/* Mutex's and condition vars for the various select stuff */
+pthread_mutex_t worker_select_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t worker_select_mutex2 = PTHREAD_MUTEX_INITIALIZER;
+
+
 
 void InitialiseSemaphores(void)
 {
@@ -177,7 +203,7 @@ void ctdl_thread_internal_free_tsd(void)
 void ctdl_thread_internal_cleanup(void)
 {
        int i;
-       struct CtdlThreadNode *this_thread, *that_thread;
+       CtdlThreadNode *this_thread, *that_thread;
        
        for (i=0; i<CTDL_THREAD_LAST_STATE; i++)
        {
@@ -202,7 +228,7 @@ void ctdl_thread_internal_cleanup(void)
 
 void ctdl_thread_internal_init(void)
 {
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        int ret = 0;
        
        GC_thread = pthread_self();
@@ -218,13 +244,13 @@ void ctdl_thread_internal_init(void)
        CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
        
        /* Get ourself a thread entry */
-       this_thread = malloc(sizeof(struct CtdlThreadNode));
+       this_thread = malloc(sizeof(CtdlThreadNode));
        if (this_thread == NULL) {
                CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
                return;
        }
        // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
-       memset (this_thread, 0, sizeof(struct CtdlThreadNode));
+       memset (this_thread, 0, sizeof(CtdlThreadNode));
        
        pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
        pthread_cond_init (&(this_thread->ThreadCond), NULL);
@@ -260,13 +286,14 @@ void ctdl_thread_internal_init(void)
 /*
  * A function to update a threads load averages
  */
- void ctdl_thread_internal_update_avgs(struct CtdlThreadNode *this_thread)
+ void ctdl_thread_internal_update_avgs(CtdlThreadNode *this_thread)
  {
        struct timeval now, result;
        double last_duration;
 
        gettimeofday(&now, NULL);
        timersub(&now, &(this_thread->last_state_change), &result);
+       /* I don't think these mutex's are needed here */
        pthread_mutex_lock(&this_thread->ThreadMutex);
        // result now has a timeval for the time we spent in the last state since we last updated
        last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000);
@@ -283,12 +310,13 @@ void ctdl_thread_internal_init(void)
 /*
  * A function to chenge the state of a thread
  */
-void ctdl_thread_internal_change_state (struct CtdlThreadNode *this_thread, enum CtdlThreadState new_state)
+void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state)
 {
        /*
         * Wether we change state or not we need update the load values
         */
        ctdl_thread_internal_update_avgs(this_thread);
+       /* This mutex not needed here? */
        pthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */
        if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ))
                this_thread->state = new_state;
@@ -308,7 +336,7 @@ void CtdlThreadStopAll(void)
        //FIXME: The signalling of the condition should not be in the critical_section
        // We need to build a list of threads we are going to signal and then signal them afterwards
        
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        
        begin_critical_section(S_THREAD_LIST);
        this_thread = CtdlThreadList;
@@ -332,7 +360,7 @@ void CtdlThreadStopAll(void)
  */
 void CtdlThreadWakeAll(void)
 {
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        
        CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
        
@@ -400,12 +428,9 @@ const char *CtdlThreadName(const char *name)
                CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name);
                return NULL;
        }
-// FIXME: do we need this lock? I think not since the pointer asignmaent should be atomic
-       pthread_mutex_lock(&CT->ThreadMutex);
        old_name = CT->name;
        if (name)
                CT->name = name;
-       pthread_mutex_unlock(&CT->ThreadMutex);
        return (old_name);
 }      
 
@@ -413,9 +438,9 @@ const char *CtdlThreadName(const char *name)
 /*
  * A function to force a thread to exit
  */
-void CtdlThreadCancel(struct CtdlThreadNode *thread)
+void CtdlThreadCancel(CtdlThreadNode *thread)
 {
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        
        if (!thread)
                this_thread = CT;
@@ -461,19 +486,15 @@ int CtdlThreadCheckStop(void)
        if (CT->signal)
                CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal);
 #endif
-       pthread_mutex_lock(&CT->ThreadMutex);
        if(state == CTDL_THREAD_STOP_REQ)
        {
                CT->state = CTDL_THREAD_STOPPING;
-               pthread_mutex_unlock(&CT->ThreadMutex);
                return -1;
        }
        else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE))
        {
-               pthread_mutex_unlock(&CT->ThreadMutex);
                return -1;
        }
-       pthread_mutex_unlock(&CT->ThreadMutex);
        return 0;
 }
 
@@ -482,9 +503,9 @@ int CtdlThreadCheckStop(void)
  * A function to ask a thread to exit
  * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
  */
-void CtdlThreadStop(struct CtdlThreadNode *thread)
+void CtdlThreadStop(CtdlThreadNode *thread)
 {
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        
        if (!thread)
                this_thread = CT;
@@ -557,7 +578,7 @@ static void ctdl_internal_thread_cleanup(void *arg)
  */
 void ctdl_thread_internal_calc_loadavg(void)
 {
-       struct CtdlThreadNode *that_thread;
+       CtdlThreadNode *that_thread;
        double load_avg, worker_avg;
        int workers = 0;
 
@@ -569,8 +590,7 @@ void ctdl_thread_internal_calc_loadavg(void)
                /* Update load averages */
                ctdl_thread_internal_update_avgs(that_thread);
                pthread_mutex_lock(&that_thread->ThreadMutex);
-               that_thread->load_avg = that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked;
-               that_thread->load_avg = that_thread->avg_running / that_thread->load_avg * 100;
+               that_thread->load_avg = (that_thread->avg_sleeping + that_thread->avg_running) / (that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked) * 100;
                that_thread->avg_sleeping /= 2;
                that_thread->avg_running /= 2;
                that_thread->avg_blocked /= 2;
@@ -581,7 +601,7 @@ void ctdl_thread_internal_calc_loadavg(void)
                        workers++;
                }
 #ifdef WITH_THREADLOG
-               CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%ld) \"%s\" %f %f %f %f.\n",
+               CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
                        that_thread->name,
                        that_thread->tid,
                        CtdlThreadStates[that_thread->state],
@@ -596,7 +616,7 @@ void ctdl_thread_internal_calc_loadavg(void)
        CtdlThreadLoadAvg = load_avg/num_threads;
        CtdlThreadWorkerAvg = worker_avg/workers;
 #ifdef WITH_THREADLOG
-       CtdlLogPrintf(CTDL_INFO, "System load average %f, workers averag %f, threads %d, workers %d, sessions %d\n", CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions);
+       CtdlLogPrintf(CTDL_INFO, "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n", CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions);
 #endif
 }
 
@@ -607,8 +627,8 @@ void ctdl_thread_internal_calc_loadavg(void)
  */
 void CtdlThreadGC (void)
 {
-       struct CtdlThreadNode *this_thread, *that_thread;
-       int workers = 0;
+       CtdlThreadNode *this_thread, *that_thread;
+       int workers = 0, sys_workers;
        int ret=0;
        
        begin_critical_section(S_THREAD_LIST);
@@ -630,18 +650,15 @@ void CtdlThreadGC (void)
                this_thread = this_thread->next;
                
                /* Do we need to clean up this thread? */
-               pthread_mutex_lock(&that_thread->ThreadMutex);
                if (that_thread->state != CTDL_THREAD_EXITED)
                {
                        if(that_thread->flags & CTDLTHREAD_WORKER)
                                workers++;      /* Sanity check on number of worker threads */
-                       pthread_mutex_unlock(&that_thread->ThreadMutex);
                        continue;
                }
                
                if (pthread_equal(that_thread->tid, pthread_self()) && that_thread->thread_func)
                {       /* Sanity check */
-                       pthread_mutex_unlock(&that_thread->ThreadMutex);
                        end_critical_section(S_THREAD_LIST);
                        CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n");
                        abort();
@@ -650,7 +667,6 @@ void CtdlThreadGC (void)
                
                if (num_threads <= 0)
                {       /* Sanity check */
-                       pthread_mutex_unlock(&that_thread->ThreadMutex);
                        end_critical_section(S_THREAD_LIST);
                        CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
                        abort();
@@ -668,7 +684,6 @@ void CtdlThreadGC (void)
                if(that_thread->next)
                        that_thread->next->prev = that_thread->prev;
                
-               pthread_mutex_unlock(&that_thread->ThreadMutex);
                pthread_cond_signal(&that_thread->ThreadCond);
                pthread_cond_signal(&that_thread->SleepCond);   // Make sure this thread is awake
                pthread_mutex_lock(&that_thread->ThreadMutex);  // Make sure it has done what its doing
@@ -698,18 +713,18 @@ void CtdlThreadGC (void)
                pthread_attr_destroy(&that_thread->attr);
                free(that_thread);
        }
+       sys_workers = num_workers;
+       end_critical_section(S_THREAD_LIST);
        
        /* Sanity check number of worker threads */
-       if (workers != num_workers)
+       if (workers != sys_workers)
        {
-               end_critical_section(S_THREAD_LIST);
                CtdlLogPrintf(CTDL_EMERG,
                        "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
-                       workers, num_workers
+                       workers, sys_workers
                        );
                abort();
        }
-       end_critical_section(S_THREAD_LIST);
 }
 
 
@@ -722,7 +737,7 @@ void CtdlThreadGC (void)
  */ 
 static void *ctdl_internal_thread_func (void *arg)
 {
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
        void *ret = NULL;
 
        /* lock and unlock the thread list.
@@ -730,7 +745,7 @@ static void *ctdl_internal_thread_func (void *arg)
         * can continue its execution.
         */
        begin_critical_section(S_THREAD_LIST);
-       this_thread = (struct CtdlThreadNode *) arg;
+       this_thread = (CtdlThreadNode *) arg;
        gettimeofday(&this_thread->start_time, NULL);           /* Time this thread started */
        pthread_mutex_lock(&this_thread->ThreadMutex);
        
@@ -786,10 +801,10 @@ static void *ctdl_internal_thread_func (void *arg)
  * Internal function to create a thread.
  * Must be called from within a S_THREAD_LIST critical section
  */ 
-struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args)
+CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args)
 {
        int ret = 0;
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
 
        if (num_threads >= 32767)
        {
@@ -797,13 +812,13 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void
                return NULL;
        }
                
-       this_thread = malloc(sizeof(struct CtdlThreadNode));
+       this_thread = malloc(sizeof(CtdlThreadNode));
        if (this_thread == NULL) {
                CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
                return NULL;
        }
        // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
-       memset (this_thread, 0, sizeof(struct CtdlThreadNode));
+       memset (this_thread, 0, sizeof(CtdlThreadNode));
        
        /* Create the mutex's early so we can use them */
        pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
@@ -913,9 +928,9 @@ struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void
  * char *name = name to give to thread, if NULL, use generic name
  * int flags = flags to determine type of thread and standard facilities
  */
-struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args)
+CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args)
 {
-       struct CtdlThreadNode *ret = NULL;
+       CtdlThreadNode *ret = NULL;
        
        begin_critical_section(S_THREAD_LIST);
        ret = ctdl_internal_create_thread(name, flags, thread_func, args);
@@ -929,10 +944,10 @@ struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_f
  * Internal function to schedule a thread.
  * Must be called from within a S_THREAD_LIST critical section
  */ 
-struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when)
+CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when)
 {
        int ret = 0;
-       struct CtdlThreadNode *this_thread;
+       CtdlThreadNode *this_thread;
 
        if (num_threads >= 32767)
        {
@@ -940,13 +955,13 @@ struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread
                return NULL;
        }
                
-       this_thread = malloc(sizeof(struct CtdlThreadNode));
+       this_thread = malloc(sizeof(CtdlThreadNode));
        if (this_thread == NULL) {
                CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
                return NULL;
        }
        // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
-       memset (this_thread, 0, sizeof(struct CtdlThreadNode));
+       memset (this_thread, 0, sizeof(CtdlThreadNode));
        
        /* Create the mutex's early so we can use them */
        pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
@@ -1026,7 +1041,7 @@ struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread
 
 
 
-struct CtdlThreadNode *ctdl_thread_internal_start_scheduled (struct CtdlThreadNode *this_thread)
+CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_thread)
 {
        int ret = 0;
        
@@ -1060,7 +1075,7 @@ struct CtdlThreadNode *ctdl_thread_internal_start_scheduled (struct CtdlThreadNo
 
 void ctdl_thread_internal_check_scheduled(void)
 {
-       struct CtdlThreadNode *this_thread, *that_thread;
+       CtdlThreadNode *this_thread, *that_thread;
        time_t now;
        
        if (try_critical_section(S_SCHEDULE_LIST))
@@ -1145,29 +1160,20 @@ int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds
 
 
 
+void *new_worker_thread(void *arg);
+extern void close_masters (void);
 
 
 
 void go_threading(void)
 {
        int i;
-       struct CtdlThreadNode *last_worker;
+       CtdlThreadNode *last_worker;
        
        /*
         * Initialise the thread system
         */
        ctdl_thread_internal_init();
-       /*
-        * Now create a bunch of worker threads.
-        */
-//     CtdlLogPrintf(CTDL_DEBUG, "Starting %d worker threads\n", config.c_min_workers);
-//     begin_critical_section(S_THREAD_LIST);
-//     i=0;    /* Always start at least 1 worker thread */
-//     do
-//     {
-//             ctdl_internal_create_thread("Worker Thread", CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, worker_thread, NULL);
-//     } while (++i < config.c_min_workers);
-//     end_critical_section(S_THREAD_LIST);
 
        /* Second call to module init functions now that threading is up */
        initialise_modules(1);
@@ -1189,7 +1195,10 @@ void go_threading(void)
                if (CT->signal)
                        exit_signal = CT->signal;
                if (exit_signal)
+               {
                        CtdlThreadStopAll();
+//                     close_masters();
+               }
                check_sched_shutdown();
                if (CT->state > CTDL_THREAD_STOP_REQ)
                {
@@ -1201,7 +1210,7 @@ void go_threading(void)
                }
                
                /* Reduce the size of the worker thread pool if necessary. */
-               if ((CtdlThreadGetWorkers() > config.c_min_workers) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ))
+               if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ))
                {
                        /* Ask a worker thread to stop as we no longer need it */
                        begin_critical_section(S_THREAD_LIST);
@@ -1209,7 +1218,7 @@ void go_threading(void)
                        while (last_worker)
                        {
                                pthread_mutex_lock(&last_worker->ThreadMutex);
-                               if (last_worker->flags & CTDLTHREAD_WORKER && last_worker->state > CTDL_THREAD_STOPPING)
+                               if (last_worker->flags & CTDLTHREAD_WORKER && (last_worker->state > CTDL_THREAD_STOPPING) && (last_worker->Context == NULL))
                                {
                                        pthread_mutex_unlock(&last_worker->ThreadMutex);
                                        break;
@@ -1237,21 +1246,27 @@ void go_threading(void)
                /* FIXME: come up with a better way to dynamically alter the number of threads
                 * based on the system load
                 */
-//             if ((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() < num_sessions))
-               // && (CtdlThreadLoadAvg < 90) )
+#ifdef NEW_WORKER
+               if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() <= num_sessions) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
+#else
                if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60) && (CtdlThreadGetLoadAvg() < 90) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
+#endif /* NEW_WORKER */
                {
                        for (i=0; i<5 ; i++)
-//                     for (i=0; i< (num_sessions - CtdlThreadGetWorkers()) ; i++)
-//                     for (i=0; i< (10 - (55 - CtdlThreadWorkerAvg) / CtdlThreadWorkerAvg / CtdlThreadGetWorkers()) ; i++)
                        {
-//                             begin_critical_section(S_THREAD_LIST);
+#ifdef NEW_WORKER
+                               CtdlThreadCreate("Worker Thread",
+                                       CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
+                                       new_worker_thread,
+                                       NULL
+                                       );
+#else
                                CtdlThreadCreate("Worker Thread",
                                        CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
                                        worker_thread,
                                        NULL
                                        );
-//                             end_critical_section(S_THREAD_LIST);
+#endif /* NEW_WORKER */
                        }
                }
                
@@ -1270,3 +1285,253 @@ void go_threading(void)
         */
        ctdl_thread_internal_cleanup();
 }
+
+
+
+
+/*
+ * Starting a new implimentation of a worker thread.
+ * This new implimentation will be faster and do more work per thread.
+ */
+/*
+ * Select on master socket.
+ * First worker thread in here acquires the lock and builds an FDSET of master sockets.
+ * then it goes into a loop selecting on the master sockets timing out every few milliseconds.
+ * If it times out it rebiulds its list and loops.
+ * If the select succeeds it creates a new context and returns.
+ * During this time the other workers are selecting on existing contexts or sleeping.
+ */
+void select_on_master(void)
+{
+        fd_set readfds;
+        struct ServiceFunctionHook *serviceptr;
+        int ssock;                     /* Descriptor for client socket */
+        int highest;
+        int m, i;
+        int retval = 0;
+        struct timeval tv;
+        struct CitContext *con;
+        const char *old_name;
+
+
+
+        old_name = CtdlThreadName("select_on_master");
+
+        /* Initialize the fdset. */
+        FD_ZERO(&readfds);
+        highest = 0;
+
+        /* First, add the various master sockets to the fdset. */
+        for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
+                m = serviceptr->msock;
+                FD_SET(m, &readfds);
+                if (m > highest) {
+                        highest = m;
+                }
+        }
+
+        tv.tv_sec = 1;         /* wake up every 1 sec if no input */
+        tv.tv_usec = 0;
+        retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
+
+        /* Select got an error or we are shutting down so get out */
+        if (retval == 0 || CtdlThreadCheckStop()) {
+                CtdlThreadName(old_name);
+                return;
+        }
+
+        /* Select says something happened on one of our master sockets so now we handle it */
+        for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
+                if (FD_ISSET(serviceptr->msock, &readfds)) {
+                        ssock = accept(serviceptr->msock, NULL, 0);
+                        if (ssock >= 0) {
+                                CtdlLogPrintf(CTDL_DEBUG, "New client socket %d\n", ssock);
+                                /* The master socket is non-blocking but the client
+                                 * sockets need to be blocking, otherwise certain
+                                 * operations barf on FreeBSD.  Not a fatal error.
+                                 */
+                                if (fcntl(ssock, F_SETFL, 0) < 0) {
+                                        CtdlLogPrintf(CTDL_EMERG,
+                                                      "citserver: Can't set socket to blocking: %s\n",
+                                                      strerror(errno));
+                                }
+
+                                /* New context will be created already
+                                 * set up in the CON_EXECUTING state.
+                                 */
+                                con = CreateNewContext();
+                                CT->Context = con;
+
+                                /* Assign our new socket number to it. */
+                                con->client_socket = ssock;
+                                con->h_command_function = serviceptr->h_command_function;
+                                con->h_async_function = serviceptr->h_async_function;
+                                con->ServiceName = serviceptr->ServiceName;
+                                /* Determine whether it's a local socket */
+                                if (serviceptr->sockpath != NULL)
+                                        con->is_local_socket = 1;
+
+                                /* Set the SO_REUSEADDR socket option */
+                                i = 1;
+                                setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
+
+                                become_session(con);
+                                begin_session(con);
+                                serviceptr->h_greeting_function();
+                                become_session(NULL);
+                                con->state = CON_IDLE;
+                                break;
+                        }
+                }
+        }
+
+        CtdlThreadName(old_name);
+}
+
+/*
+ * Select on client socket.
+ * First worker thread in here acquires the lock and builds an FDSET of client sockets.
+ * then it selects on the client sockets timing out after 1 second.
+ * If it times out the thread goes off to check on housekeeping etc.
+ * If the select succeeds the thread goes off to handle the client request.
+ * If the list of client connections is empty the threads all sleep for one second
+ */
+struct CitContext *select_on_client(void)
+{
+       fd_set readfds;
+       struct timeval tv;
+       int retval = 0;
+       int highest=0;
+       const char *old_name;
+       
+       
+       old_name = CtdlThreadName("select_on_client");
+       
+       /* Initialise the fdset */
+       FD_ZERO(&readfds);
+       FD_SET(CT->Context->client_socket, &readfds);
+       highest = CT->Context->client_socket;   
+       /* Now we can select on any connections that are waiting */
+       
+       if (!CtdlThreadCheckStop())
+       {
+               tv.tv_sec = config.c_sleeping;          /* wake up every second if no input */
+               tv.tv_usec = 0;
+               retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+       }
+       else    /* Shutting down? */
+       {
+               CtdlThreadName(old_name);
+               return(NULL);
+       }
+               
+
+       /* Now figure out who made this select() unblock.
+        * First, check for an error or exit condition.
+        */
+       if (retval < 0) {
+               if (errno == EBADF) {
+                       CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n",
+                               strerror(errno));
+               }
+               if (errno != EINTR) {
+                       CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno));
+                       CtdlThreadStopAll();
+               } else if (!CtdlThreadCheckStop()) {
+                       CtdlLogPrintf(CTDL_DEBUG, "Un handled select failure.\n");
+               }
+               CtdlThreadName(old_name);
+               return NULL;
+       }
+       else if(retval == 0)
+       {
+               CtdlThreadName(old_name);
+               CT->Context->kill_me = 1;
+               CT->Context = NULL;
+               return CT->Context;
+       }
+       
+       CT->Context->state = CON_EXECUTING;
+       CT->Context->input_waiting = 1;
+       
+       CtdlThreadName(old_name);
+       return (CT->Context);
+}
+
+
+
+/*
+ * Do the worker threads work when needed
+ */
+int execute_session(struct CitContext *bind_me)
+{
+       int force_purge;
+       
+       become_session(bind_me);
+
+       /* If the client has sent a command, execute it. */
+       if (CC->input_waiting) {
+               CC->h_command_function();
+               CC->input_waiting = 0;
+       }
+
+       /* If there are asynchronous messages waiting and the
+        * client supports it, do those now */
+       if ((CC->is_async) && (CC->async_waiting)
+          && (CC->h_async_function != NULL)) {
+               CC->h_async_function();
+               CC->async_waiting = 0;
+       }
+               
+       force_purge = CC->kill_me;
+       if (force_purge)
+               CT->Context = NULL;
+       become_session(NULL);
+       bind_me->state = CON_IDLE;
+       return force_purge;
+}
+
+
+
+extern void dead_session_purge(int force);
+
+/*
+ * A new worker_thread loop.
+ */
+void *new_worker_thread(void *arg)
+{
+       struct CitContext *bind_me;
+       int force_purge;
+       
+       while (!CtdlThreadCheckStop()) {
+
+               /* make doubly sure we're not holding any stale db handles
+                * which might cause a deadlock.
+                */
+               cdb_check_handles();
+               force_purge = 0;
+               bind_me = NULL;         /* Which session shall we handle? */
+                       
+               if (CT->Context == NULL)
+                       select_on_master();
+               if (CtdlThreadCheckStop())
+                       break;
+                       
+               if (CT->Context)
+                       bind_me = select_on_client();
+               if (CtdlThreadCheckStop())
+                       break;
+                       
+               if (bind_me)
+                       force_purge = execute_session(bind_me);
+                       
+               dead_session_purge(force_purge);
+               if (CtdlThreadCheckStop())
+                       break;
+                       
+               do_housekeeping();
+       }
+       return NULL;
+}
index 5d21101386d89b22b5476ff931f71f70a5856734..cbffc273eabaab41c325566e903186c8c5478428 100644 (file)
@@ -39,12 +39,13 @@ enum CtdlThreadState {
        CTDL_THREAD_RUNNING,
        CTDL_THREAD_LAST_STATE
 };
+typedef struct CtdlThreadNode CtdlThreadNode;
 
-extern struct CtdlThreadNode {
+struct CtdlThreadNode{
        pthread_t tid;                          /* id as returned by pthread_create() */
        pid_t pid;                              /* pid, as best the OS will let us determine */
        time_t when;                            /* When to start a scheduled thread */
-       struct CitConext *Context;              /* The session context that this thread mught be working on or NULL if none */
+       struct CitContext *Context;             /* The session context that this thread mught be working on or NULL if none */
        long number;                            /* A unigue number for this thread (not implimented yet) */
        int wakefd_recv;                        /* An fd that this thread can sleep on (not implimented yet) */
        int wakefd_send;                        /* An fd that this thread can send out on (Not implimented yet) */
@@ -65,15 +66,19 @@ extern struct CtdlThreadNode {
        double avg_running;                     /* Average running time */
        double avg_blocked;                     /* Average blocked time */
        double load_avg;                        /* Load average for this thread */
-       struct CtdlThreadNode *prev;            /* Previous thread in the thread table */
-       struct CtdlThreadNode *next;            /* Next thread in the thread table */
-} *CtdlThreadList;
+       CtdlThreadNode *prev;           /* Previous thread in the thread table */
+       CtdlThreadNode *next;           /* Next thread in the thread table */
+} ;
+extern CtdlThreadNode *CtdlThreadList;
 
-typedef struct {
+typedef struct ThreadTSD ThreadTSD;
+
+struct ThreadTSD {
        DB_TXN *tid;            /* Transaction handle */
        DBC *cursors[MAXCDB];   /* Cursors, for traversals... */
-       struct CtdlThreadNode *self;    /* Pointer to this threads control structure */
-}ThreadTSD ;
+       CtdlThreadNode *self;   /* Pointer to this threads control structure */
+} ;
 
 extern double CtdlThreadLoadAvg;
 extern double CtdlThreadWorkerAvg;
@@ -85,7 +90,7 @@ void ctdl_thread_internal_init(void);
 void ctdl_thread_internal_cleanup(void);
 void ctdl_thread_internal_calc_loadavg(void);
 void ctdl_thread_internal_free_tsd(void);
-struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args);
+CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args);
 void ctdl_thread_internal_check_scheduled(void);
 
 void InitialiseSemaphores(void);