Removed two unused mutex's.
[citadel.git] / citadel / threads.c
1 /*
2  * $Id: sysdep.c 5882 2007-12-13 19:46:05Z davew $
3  *
4  * Citadel "system dependent" stuff.
5  * See copyright.txt for copyright information.
6  *
7  * Here's where we have the Citadel thread implimentation
8  *
9  */
10
11 #include <errno.h>
12 #include <sys/socket.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15
16 #if TIME_WITH_SYS_TIME
17 # include <sys/time.h>
18 # include <time.h>
19 #else
20 # if HAVE_SYS_TIME_H
21 #  include <sys/time.h>
22 # else
23 #  include <time.h>
24 # endif
25 #endif
26
27 #include "threads.h"
28 #include "ctdl_module.h"
29 #include "modules_init.h"
30 #include "housekeeping.h"
31 #include "config.h"
32 #include "citserver.h"
33 #include "sysdep_decls.h"
34
35 /*
36  * define this to use the new worker_thread method of handling connections
37  */
38 //#define NEW_WORKER
39
40 /*
41  * New thread interface.
42  * To create a thread you must call one of the create thread functions.
43  * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
44  * struct CtdlThreadNode *node = NULL;
45  * pass in &node
46  * If the thread is created *node will point to the thread control structure for the created thread.
47  * If the thread creation fails *node remains NULL
48  * Do not free the memory pointed to by *node, it doesn't belong to you.
49  * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
50  * remove the need for the calls to eCrashRegisterThread and friends
51  */
52
53 static int num_threads = 0;                     /* Current number of threads */
54 static int num_workers = 0;                     /* Current number of worker threads */
55
56 CtdlThreadNode *CtdlThreadList = NULL;
57 CtdlThreadNode *CtdlThreadSchedList = NULL;
58
59 /*
60  * Condition variable and Mutex for thread garbage collection
61  */
62 /*static pthread_mutex_t thread_gc_mutex = PTHREAD_MUTEX_INITIALIZER;
63 static pthread_cond_t thread_gc_cond = PTHREAD_COND_INITIALIZER;
64 */
65 static pthread_t GC_thread;
66 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
67 double CtdlThreadLoadAvg = 0;
68 double CtdlThreadWorkerAvg = 0;
69 pthread_key_t ThreadKey;
70
71 pthread_mutex_t Critters[MAX_SEMAPHORES];       /* Things needing locking */
72
73
74
75 void InitialiseSemaphores(void)
76 {
77         int i;
78
79         /* Set up a bunch of semaphores to be used for critical sections */
80         for (i=0; i<MAX_SEMAPHORES; ++i) {
81                 pthread_mutex_init(&Critters[i], NULL);
82         }
83 }
84
85
86
87
88 /*
89  * Obtain a semaphore lock to begin a critical section.
90  * but only if no one else has one
91  */
92 int try_critical_section(int which_one)
93 {
94         /* For all types of critical sections except those listed here,
95          * ensure nobody ever tries to do a critical section within a
96          * transaction; this could lead to deadlock.
97          */
98         if (    (which_one != S_FLOORCACHE)
99 #ifdef DEBUG_MEMORY_LEAKS
100                 && (which_one != S_DEBUGMEMLEAKS)
101 #endif
102                 && (which_one != S_RPLIST)
103         ) {
104                 cdb_check_handles();
105         }
106         return (pthread_mutex_trylock(&Critters[which_one]));
107 }
108
109
110 /*
111  * Obtain a semaphore lock to begin a critical section.
112  */
113 void begin_critical_section(int which_one)
114 {
115         /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
116
117         /* For all types of critical sections except those listed here,
118          * ensure nobody ever tries to do a critical section within a
119          * transaction; this could lead to deadlock.
120          */
121         if (    (which_one != S_FLOORCACHE)
122 #ifdef DEBUG_MEMORY_LEAKS
123                 && (which_one != S_DEBUGMEMLEAKS)
124 #endif
125                 && (which_one != S_RPLIST)
126         ) {
127                 cdb_check_handles();
128         }
129         pthread_mutex_lock(&Critters[which_one]);
130 }
131
132 /*
133  * Release a semaphore lock to end a critical section.
134  */
135 void end_critical_section(int which_one)
136 {
137         pthread_mutex_unlock(&Critters[which_one]);
138 }
139
140
141 /*
142  * A function to destroy the TSD
143  */
144 static void ctdl_thread_internal_dest_tsd(void *arg)
145 {
146         if (arg != NULL) {
147                 check_handles(arg);
148                 free(arg);
149         }
150 }
151
152
153 /*
154  * A function to initialise the thread TSD
155  */
156 void ctdl_thread_internal_init_tsd(void)
157 {
158         int ret;
159         
160         if ((ret = pthread_key_create(&ThreadKey, ctdl_thread_internal_dest_tsd))) {
161                 lprintf(CTDL_EMERG, "pthread_key_create: %s\n",
162                         strerror(ret));
163                 exit(CTDLEXIT_DB);
164         }
165 }
166
167 /*
168  * Ensure that we have a key for thread-specific data. 
169  *
170  * This should be called immediately after startup by any thread 
171  * 
172  */
173 void CtdlThreadAllocTSD(void)
174 {
175         ThreadTSD *tsd;
176
177         if (pthread_getspecific(ThreadKey) != NULL)
178                 return;
179
180         tsd = malloc(sizeof(ThreadTSD));
181
182         tsd->tid = NULL;
183
184         memset(tsd->cursors, 0, sizeof tsd->cursors);
185         tsd->self = NULL;
186         
187         pthread_setspecific(ThreadKey, tsd);
188 }
189
190
191 void ctdl_thread_internal_free_tsd(void)
192 {
193         ctdl_thread_internal_dest_tsd(pthread_getspecific(ThreadKey));
194         pthread_setspecific(ThreadKey, NULL);
195 }
196
197
198 void ctdl_thread_internal_cleanup(void)
199 {
200         int i;
201         CtdlThreadNode *this_thread, *that_thread;
202         
203         for (i=0; i<CTDL_THREAD_LAST_STATE; i++)
204         {
205                 free (CtdlThreadStates[i]);
206         }
207         
208         /* Clean up the scheduled thread list */
209         this_thread = CtdlThreadSchedList;
210         while (this_thread)
211         {
212                 that_thread = this_thread;
213                 this_thread = this_thread->next;
214                 pthread_mutex_destroy(&that_thread->ThreadMutex);
215                 pthread_cond_destroy(&that_thread->ThreadCond);
216                 pthread_mutex_destroy(&that_thread->SleepMutex);
217                 pthread_cond_destroy(&that_thread->SleepCond);
218                 pthread_attr_destroy(&that_thread->attr);
219                 free(that_thread);
220         }
221         ctdl_thread_internal_free_tsd();
222 }
223
224 void ctdl_thread_internal_init(void)
225 {
226         CtdlThreadNode *this_thread;
227         int ret = 0;
228         
229         GC_thread = pthread_self();
230         CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread");
231         CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
232         CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created");
233         CtdlThreadStates[CTDL_THREAD_CANCELLED] = strdup("Thread Cancelled");
234         CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
235         CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
236         CtdlThreadStates[CTDL_THREAD_STOP_REQ] = strdup("Thread Stop Requested");
237         CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
238         CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
239         CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
240         
241         /* Get ourself a thread entry */
242         this_thread = malloc(sizeof(CtdlThreadNode));
243         if (this_thread == NULL) {
244                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
245                 return;
246         }
247         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
248         memset (this_thread, 0, sizeof(CtdlThreadNode));
249         
250         pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
251         pthread_cond_init (&(this_thread->ThreadCond), NULL);
252         pthread_mutex_init (&(this_thread->SleepMutex), NULL);
253         pthread_cond_init (&(this_thread->SleepCond), NULL);
254         
255         /* We are garbage collector so create us as running */
256         this_thread->state = CTDL_THREAD_RUNNING;
257         
258         if ((ret = pthread_attr_init(&this_thread->attr))) {
259                 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
260                 free(this_thread);
261                 return;
262         }
263
264         this_thread->name = "Garbage Collection Thread";
265         
266         this_thread->tid = GC_thread;
267         CT = this_thread;
268         
269         num_threads++;  // Increase the count of threads in the system.
270
271         this_thread->next = CtdlThreadList;
272         CtdlThreadList = this_thread;
273         if (this_thread->next)
274                 this_thread->next->prev = this_thread;
275         /* Set up start times */
276         gettimeofday(&this_thread->start_time, NULL);           /* Time this thread started */
277         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval));     /* Changed state so mark it. */
278 }
279
280
281 /*
282  * A function to update a threads load averages
283  */
284  void ctdl_thread_internal_update_avgs(CtdlThreadNode *this_thread)
285  {
286         struct timeval now, result;
287         double last_duration;
288
289         gettimeofday(&now, NULL);
290         timersub(&now, &(this_thread->last_state_change), &result);
291         /* I don't think these mutex's are needed here */
292         pthread_mutex_lock(&this_thread->ThreadMutex);
293         // result now has a timeval for the time we spent in the last state since we last updated
294         last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000);
295         if (this_thread->state == CTDL_THREAD_SLEEPING)
296                 this_thread->avg_sleeping += last_duration;
297         if (this_thread->state == CTDL_THREAD_RUNNING)
298                 this_thread->avg_running += last_duration;
299         if (this_thread->state == CTDL_THREAD_BLOCKED)
300                 this_thread->avg_blocked += last_duration;
301         memcpy (&this_thread->last_state_change, &now, sizeof (struct timeval));
302         pthread_mutex_unlock(&this_thread->ThreadMutex);
303 }
304
305 /*
306  * A function to chenge the state of a thread
307  */
308 void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state)
309 {
310         /*
311          * Wether we change state or not we need update the load values
312          */
313         ctdl_thread_internal_update_avgs(this_thread);
314         /* This mutex not needed here? */
315         pthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */
316         if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ))
317                 this_thread->state = new_state;
318         if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING))
319                 this_thread->state = new_state;
320         if ((new_state == CTDL_THREAD_RUNNING) && ((this_thread->state == CTDL_THREAD_SLEEPING) || (this_thread->state == CTDL_THREAD_BLOCKED)))
321                 this_thread->state = new_state;
322         pthread_mutex_unlock(&this_thread->ThreadMutex);
323 }
324
325
326 /*
327  * A function to tell all threads to exit
328  */
329 void CtdlThreadStopAll(void)
330 {
331         //FIXME: The signalling of the condition should not be in the critical_section
332         // We need to build a list of threads we are going to signal and then signal them afterwards
333         
334         CtdlThreadNode *this_thread;
335         
336         begin_critical_section(S_THREAD_LIST);
337         this_thread = CtdlThreadList;
338         while(this_thread)
339         {
340 #ifdef THREADS_USESIGNALS
341                 pthread_kill(this_thread->tid, SIGHUP);
342 #endif
343                 ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
344                 pthread_cond_signal(&this_thread->ThreadCond);
345                 pthread_cond_signal(&this_thread->SleepCond);
346                 CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (%ld).\n", this_thread->name, this_thread->tid);
347                 this_thread = this_thread->next;
348         }
349         end_critical_section(S_THREAD_LIST);
350 }
351
352
353 /*
354  * A function to wake up all sleeping threads
355  */
356 void CtdlThreadWakeAll(void)
357 {
358         CtdlThreadNode *this_thread;
359         
360         CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
361         
362         begin_critical_section(S_THREAD_LIST);
363         this_thread = CtdlThreadList;
364         while(this_thread)
365         {
366                 if (!this_thread->thread_func)
367                 {
368                         pthread_cond_signal(&this_thread->ThreadCond);
369                         pthread_cond_signal(&this_thread->SleepCond);
370                 }
371                 this_thread = this_thread->next;
372         }
373         end_critical_section(S_THREAD_LIST);
374 }
375
376
377 /*
378  * A function to return the number of threads running in the system
379  */
380 int CtdlThreadGetCount(void)
381 {
382         return  num_threads;
383 }
384
385 int CtdlThreadGetWorkers(void)
386 {
387         return  num_workers;
388 }
389
390 double CtdlThreadGetWorkerAvg(void)
391 {
392         double ret;
393         
394         begin_critical_section(S_THREAD_LIST);
395         ret =  CtdlThreadWorkerAvg;
396         end_critical_section(S_THREAD_LIST);
397         return ret;
398 }
399
400 double CtdlThreadGetLoadAvg(void)
401 {
402         double ret;
403         
404         begin_critical_section(S_THREAD_LIST);
405         ret =  CtdlThreadLoadAvg;
406         end_critical_section(S_THREAD_LIST);
407         return ret;
408 }
409
410
411
412
413 /*
414  * A function to rename a thread
415  * Returns a const char *
416  */
417 const char *CtdlThreadName(const char *name)
418 {
419         const char *old_name;
420         
421         if (!CT)
422         {
423                 CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name);
424                 return NULL;
425         }
426         old_name = CT->name;
427         if (name)
428                 CT->name = name;
429         return (old_name);
430 }       
431
432
433 /*
434  * A function to force a thread to exit
435  */
436 void CtdlThreadCancel(CtdlThreadNode *thread)
437 {
438         CtdlThreadNode *this_thread;
439         
440         if (!thread)
441                 this_thread = CT;
442         else
443                 this_thread = thread;
444         if (!this_thread)
445         {
446                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
447                 CtdlThreadStopAll();
448                 return;
449         }
450         
451         if (!this_thread->thread_func)
452         {
453                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
454                 CtdlThreadStopAll();
455                 return;
456         }
457         
458         ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED);
459         pthread_cancel(this_thread->tid);
460 }
461
462
463
464 /*
465  * A function for a thread to check if it has been asked to stop
466  */
467 int CtdlThreadCheckStop(void)
468 {
469         int state;
470         
471         if (!CT)
472         {
473                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
474                 CtdlThreadStopAll();
475                 return -1;
476         }
477         
478         state = CT->state;
479
480 #ifdef THREADS_USERSIGNALS
481         if (CT->signal)
482                 CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal);
483 #endif
484         if(state == CTDL_THREAD_STOP_REQ)
485         {
486                 CT->state = CTDL_THREAD_STOPPING;
487                 return -1;
488         }
489         else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE))
490         {
491                 return -1;
492         }
493         return 0;
494 }
495
496
497 /*
498  * A function to ask a thread to exit
499  * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
500  */
501 void CtdlThreadStop(CtdlThreadNode *thread)
502 {
503         CtdlThreadNode *this_thread;
504         
505         if (!thread)
506                 this_thread = CT;
507         else
508                 this_thread = thread;
509         if (!this_thread)
510                 return;
511         if (!(this_thread->thread_func))
512                 return;         // Don't stop garbage collector
513 #ifdef THREADS_USESIGNALS
514         pthread_kill(this_thread->tid, SIGHUP); 
515 #endif
516         ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
517         pthread_cond_signal(&this_thread->ThreadCond);
518         pthread_cond_signal(&this_thread->SleepCond);
519 }
520
521 /*
522  * So we now have a sleep command that works with threads but it is in seconds
523  */
524 void CtdlThreadSleep(int secs)
525 {
526         struct timespec wake_time;
527         struct timeval time_now;
528         
529         
530         if (!CT)
531         {
532                 CtdlLogPrintf(CTDL_WARNING, "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
533                 return;
534         }
535         
536         memset (&wake_time, 0, sizeof(struct timespec));
537         gettimeofday(&time_now, NULL);
538         wake_time.tv_sec = time_now.tv_sec + secs;
539         wake_time.tv_nsec = time_now.tv_usec * 10;
540
541         ctdl_thread_internal_change_state (CT, CTDL_THREAD_SLEEPING);
542         
543         pthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
544         pthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, &wake_time);
545         pthread_mutex_unlock(&CT->ThreadMutex);
546         
547         ctdl_thread_internal_change_state (CT, CTDL_THREAD_RUNNING);
548 }
549
550
551 /*
552  * Routine to clean up our thread function on exit
553  */
554 static void ctdl_internal_thread_cleanup(void *arg)
555 {
556         /*
557          * In here we were called by the current thread because it is exiting
558          * NB. WE ARE THE CURRENT THREAD
559          */
560         CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", CT->name, CT->tid);
561         
562         #ifdef HAVE_BACKTRACE
563         eCrash_UnregisterThread();
564         #endif
565         
566         pthread_mutex_lock(&CT->ThreadMutex);
567         CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
568         pthread_mutex_unlock(&CT->ThreadMutex);
569 }
570
571 /*
572  * A quick function to show the load averages
573  */
574 void ctdl_thread_internal_calc_loadavg(void)
575 {
576         CtdlThreadNode *that_thread;
577         double load_avg, worker_avg;
578         int workers = 0;
579
580         that_thread = CtdlThreadList;
581         load_avg = 0;
582         worker_avg = 0;
583         while(that_thread)
584         {
585                 /* Update load averages */
586                 ctdl_thread_internal_update_avgs(that_thread);
587                 pthread_mutex_lock(&that_thread->ThreadMutex);
588                 that_thread->load_avg = (that_thread->avg_sleeping + that_thread->avg_running) / (that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked) * 100;
589                 that_thread->avg_sleeping /= 2;
590                 that_thread->avg_running /= 2;
591                 that_thread->avg_blocked /= 2;
592                 load_avg += that_thread->load_avg;
593                 if (that_thread->flags & CTDLTHREAD_WORKER)
594                 {
595                         worker_avg += that_thread->load_avg;
596                         workers++;
597                 }
598 #ifdef WITH_THREADLOG
599                 CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
600                         that_thread->name,
601                         that_thread->tid,
602                         CtdlThreadStates[that_thread->state],
603                         that_thread->avg_sleeping,
604                         that_thread->avg_running,
605                         that_thread->avg_blocked,
606                         that_thread->load_avg);
607 #endif
608                 pthread_mutex_unlock(&that_thread->ThreadMutex);
609                 that_thread = that_thread->next;
610         }
611         CtdlThreadLoadAvg = load_avg/num_threads;
612         CtdlThreadWorkerAvg = worker_avg/workers;
613 #ifdef WITH_THREADLOG
614         CtdlLogPrintf(CTDL_INFO, "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n", CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions);
615 #endif
616 }
617
618
619 /*
620  * Garbage collection routine.
621  * Gets called by main() in a loop to clean up the thread list periodically.
622  */
623 void CtdlThreadGC (void)
624 {
625         CtdlThreadNode *this_thread, *that_thread;
626         int workers = 0, sys_workers;
627         int ret=0;
628         
629         begin_critical_section(S_THREAD_LIST);
630         
631         /* Handle exiting of garbage collector thread */
632         if(num_threads == 1)
633                 CtdlThreadList->state = CTDL_THREAD_EXITED;
634         
635 #ifdef WITH_THREADLOG
636         CtdlLogPrintf(CTDL_DEBUG, "Thread system running garbage collection.\n");
637 #endif
638         /*
639          * Woke up to do garbage collection
640          */
641         this_thread = CtdlThreadList;
642         while(this_thread)
643         {
644                 that_thread = this_thread;
645                 this_thread = this_thread->next;
646                 
647                 /* Do we need to clean up this thread? */
648                 if (that_thread->state != CTDL_THREAD_EXITED)
649                 {
650                         if(that_thread->flags & CTDLTHREAD_WORKER)
651                                 workers++;      /* Sanity check on number of worker threads */
652                         continue;
653                 }
654                 
655                 if (pthread_equal(that_thread->tid, pthread_self()) && that_thread->thread_func)
656                 {       /* Sanity check */
657                         end_critical_section(S_THREAD_LIST);
658                         CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n");
659                         abort();
660                         return;
661                 }
662                 
663                 if (num_threads <= 0)
664                 {       /* Sanity check */
665                         end_critical_section(S_THREAD_LIST);
666                         CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
667                         abort();
668                         return;
669                 }
670
671                 if(that_thread->flags & CTDLTHREAD_WORKER)
672                         num_workers--;  /* This is a wroker thread so reduce the count. */
673                 num_threads--;
674                 /* If we are unlinking the list head then the next becomes the list head */
675                 if(that_thread->prev)
676                         that_thread->prev->next = that_thread->next;
677                 else
678                         CtdlThreadList = that_thread->next;
679                 if(that_thread->next)
680                         that_thread->next->prev = that_thread->prev;
681                 
682                 pthread_cond_signal(&that_thread->ThreadCond);
683                 pthread_cond_signal(&that_thread->SleepCond);   // Make sure this thread is awake
684                 pthread_mutex_lock(&that_thread->ThreadMutex);  // Make sure it has done what its doing
685                 pthread_mutex_unlock(&that_thread->ThreadMutex);
686                 /*
687                  * Join on the thread to do clean up and prevent memory leaks
688                  * Also makes sure the thread has cleaned up after itself before we remove it from the list
689                  * We can join on the garbage collector thread the join should just return EDEADLCK
690                  */
691                 ret = pthread_join (that_thread->tid, NULL);
692                 if (ret == EDEADLK)
693                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection on own thread.\n");
694                 else if (ret == EINVAL)
695                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, that thread already joined on.\n");
696                 else if (ret == ESRCH)
697                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, no thread to join on.\n");
698                 else if (ret != 0)
699                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, pthread_join returned an unknown error.\n");
700                 /*
701                  * Now we own that thread entry
702                  */
703                 CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (%ld).\n", that_thread->name, that_thread->tid);
704                 pthread_mutex_destroy(&that_thread->ThreadMutex);
705                 pthread_cond_destroy(&that_thread->ThreadCond);
706                 pthread_mutex_destroy(&that_thread->SleepMutex);
707                 pthread_cond_destroy(&that_thread->SleepCond);
708                 pthread_attr_destroy(&that_thread->attr);
709                 free(that_thread);
710         }
711         sys_workers = num_workers;
712         end_critical_section(S_THREAD_LIST);
713         
714         /* Sanity check number of worker threads */
715         if (workers != sys_workers)
716         {
717                 CtdlLogPrintf(CTDL_EMERG,
718                         "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
719                         workers, sys_workers
720                         );
721                 abort();
722         }
723 }
724
725
726
727  
728 /*
729  * Runtime function for a Citadel Thread.
730  * This initialises the threads environment and then calls the user supplied thread function
731  * Note that this is the REAL thread function and wraps the users thread function.
732  */ 
733 static void *ctdl_internal_thread_func (void *arg)
734 {
735         CtdlThreadNode *this_thread;
736         void *ret = NULL;
737
738         /* lock and unlock the thread list.
739          * This causes this thread to wait until all its creation stuff has finished before it
740          * can continue its execution.
741          */
742         begin_critical_section(S_THREAD_LIST);
743         this_thread = (CtdlThreadNode *) arg;
744         gettimeofday(&this_thread->start_time, NULL);           /* Time this thread started */
745         pthread_mutex_lock(&this_thread->ThreadMutex);
746         
747         // Register the cleanup function to take care of when we exit.
748         pthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
749         // Get our thread data structure
750         CtdlThreadAllocTSD();
751         CT = this_thread;
752         this_thread->pid = getpid();
753         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval));     /* Changed state so mark it. */
754         /* Only change to running state if we weren't asked to stop during the create cycle
755          * Other wise there is a window to allow this threads creation to continue to full grown and
756          * therby prevent a shutdown of the server.
757          */
758         pthread_mutex_unlock(&this_thread->ThreadMutex);
759                 
760         if (!CtdlThreadCheckStop())
761         {
762                 pthread_mutex_lock(&this_thread->ThreadMutex);
763                 this_thread->state = CTDL_THREAD_RUNNING;
764                 pthread_mutex_unlock(&this_thread->ThreadMutex);
765         }
766         end_critical_section(S_THREAD_LIST);
767         
768         // Register for tracing
769         #ifdef HAVE_BACKTRACE
770         eCrash_RegisterThread(this_thread->name, 0);
771         #endif
772         
773         // Tell the world we are here
774         CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", this_thread->name, this_thread->tid);
775
776         
777         
778         /*
779          * run the thread to do the work but only if we haven't been asked to stop
780          */
781         if (!CtdlThreadCheckStop())
782                 ret = (this_thread->thread_func)(this_thread->user_args);
783         
784         /*
785          * Our thread is exiting either because it wanted to end or because the server is stopping
786          * We need to clean up
787          */
788         pthread_cleanup_pop(1); // Execute our cleanup routine and remove it
789         
790         return(ret);
791 }
792
793
794  
795 /*
796  * Internal function to create a thread.
797  * Must be called from within a S_THREAD_LIST critical section
798  */ 
799 CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args)
800 {
801         int ret = 0;
802         CtdlThreadNode *this_thread;
803
804         if (num_threads >= 32767)
805         {
806                 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
807                 return NULL;
808         }
809                 
810         this_thread = malloc(sizeof(CtdlThreadNode));
811         if (this_thread == NULL) {
812                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
813                 return NULL;
814         }
815         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
816         memset (this_thread, 0, sizeof(CtdlThreadNode));
817         
818         /* Create the mutex's early so we can use them */
819         pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
820         pthread_cond_init (&(this_thread->ThreadCond), NULL);
821         pthread_mutex_init (&(this_thread->SleepMutex), NULL);
822         pthread_cond_init (&(this_thread->SleepCond), NULL);
823         
824         pthread_mutex_lock(&this_thread->ThreadMutex);
825         
826         this_thread->state = CTDL_THREAD_CREATE;
827         
828         if ((ret = pthread_attr_init(&this_thread->attr))) {
829                 pthread_mutex_unlock(&this_thread->ThreadMutex);
830                 pthread_mutex_destroy(&(this_thread->ThreadMutex));
831                 pthread_cond_destroy(&(this_thread->ThreadCond));
832                 pthread_mutex_destroy(&(this_thread->SleepMutex));
833                 pthread_cond_destroy(&(this_thread->SleepCond));
834                 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
835                 free(this_thread);
836                 return NULL;
837         }
838
839         /* Our per-thread stacks need to be bigger than the default size,
840          * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
841          * crashes on 64-bit Linux.
842          */
843         if (flags & CTDLTHREAD_BIGSTACK)
844         {
845 #ifdef WITH_THREADLOG
846                 CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n");
847 #endif
848                 if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) {
849                         pthread_mutex_unlock(&this_thread->ThreadMutex);
850                         pthread_mutex_destroy(&(this_thread->ThreadMutex));
851                         pthread_cond_destroy(&(this_thread->ThreadCond));
852                         pthread_mutex_destroy(&(this_thread->SleepMutex));
853                         pthread_cond_destroy(&(this_thread->SleepCond));
854                         pthread_attr_destroy(&this_thread->attr);
855                         CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n",
856                                 strerror(ret));
857                         free(this_thread);
858                         return NULL;
859                 }
860         }
861
862         /*
863          * If we got here we are going to create the thread so we must initilise the structure
864          * first because most implimentations of threading can't create it in a stopped state
865          * and it might want to do things with its structure that aren't initialised otherwise.
866          */
867         if(name)
868         {
869                 this_thread->name = name;
870         }
871         else
872         {
873                 this_thread->name = "Un-named Thread";
874         }
875         
876         this_thread->flags = flags;
877         this_thread->thread_func = thread_func;
878         this_thread->user_args = args;
879         /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
880          * load average for the system. If we don't do this then we create a mass of threads at the same time 
881          * because the creation didn't affect the load average.
882          */
883         this_thread->avg_blocked = 2;
884         
885         /*
886          * We pass this_thread into the thread as its args so that it can find out information
887          * about itself and it has a bit of storage space for itself, not to mention that the REAL
888          * thread function needs to finish off the setup of the structure
889          */
890         if ((ret = pthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
891         {
892
893                 CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n",
894                         strerror(ret));
895                 pthread_mutex_unlock(&this_thread->ThreadMutex);
896                 pthread_mutex_destroy(&(this_thread->ThreadMutex));
897                 pthread_cond_destroy(&(this_thread->ThreadCond));
898                 pthread_mutex_destroy(&(this_thread->SleepMutex));
899                 pthread_cond_destroy(&(this_thread->SleepCond));
900                 pthread_attr_destroy(&this_thread->attr);
901                 free(this_thread);
902                 return NULL;
903         }
904         
905         num_threads++;  // Increase the count of threads in the system.
906         if(this_thread->flags & CTDLTHREAD_WORKER)
907                 num_workers++;
908
909         this_thread->next = CtdlThreadList;
910         CtdlThreadList = this_thread;
911         if (this_thread->next)
912                 this_thread->next->prev = this_thread;
913         
914         pthread_mutex_unlock(&this_thread->ThreadMutex);
915         
916         ctdl_thread_internal_calc_loadavg();
917         return this_thread;
918 }
919
920 /*
921  * Wrapper function to create a thread
922  * ensures the critical section and other protections are in place.
923  * char *name = name to give to thread, if NULL, use generic name
924  * int flags = flags to determine type of thread and standard facilities
925  */
926 CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args)
927 {
928         CtdlThreadNode *ret = NULL;
929         
930         begin_critical_section(S_THREAD_LIST);
931         ret = ctdl_internal_create_thread(name, flags, thread_func, args);
932         end_critical_section(S_THREAD_LIST);
933         return ret;
934 }
935
936
937
938 /*
939  * Internal function to schedule a thread.
940  * Must be called from within a S_THREAD_LIST critical section
941  */ 
942 CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when)
943 {
944         int ret = 0;
945         CtdlThreadNode *this_thread;
946
947         if (num_threads >= 32767)
948         {
949                 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
950                 return NULL;
951         }
952                 
953         this_thread = malloc(sizeof(CtdlThreadNode));
954         if (this_thread == NULL) {
955                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
956                 return NULL;
957         }
958         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
959         memset (this_thread, 0, sizeof(CtdlThreadNode));
960         
961         /* Create the mutex's early so we can use them */
962         pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
963         pthread_cond_init (&(this_thread->ThreadCond), NULL);
964         pthread_mutex_init (&(this_thread->SleepMutex), NULL);
965         pthread_cond_init (&(this_thread->SleepCond), NULL);
966         
967         this_thread->state = CTDL_THREAD_CREATE;
968         
969         if ((ret = pthread_attr_init(&this_thread->attr))) {
970                 pthread_mutex_destroy(&(this_thread->ThreadMutex));
971                 pthread_cond_destroy(&(this_thread->ThreadCond));
972                 pthread_mutex_destroy(&(this_thread->SleepMutex));
973                 pthread_cond_destroy(&(this_thread->SleepCond));
974                 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
975                 free(this_thread);
976                 return NULL;
977         }
978
979         /* Our per-thread stacks need to be bigger than the default size,
980          * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
981          * crashes on 64-bit Linux.
982          */
983         if (flags & CTDLTHREAD_BIGSTACK)
984         {
985                 CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n");
986                 if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) {
987                         pthread_mutex_destroy(&(this_thread->ThreadMutex));
988                         pthread_cond_destroy(&(this_thread->ThreadCond));
989                         pthread_mutex_destroy(&(this_thread->SleepMutex));
990                         pthread_cond_destroy(&(this_thread->SleepCond));
991                         pthread_attr_destroy(&this_thread->attr);
992                         CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n",
993                                 strerror(ret));
994                         free(this_thread);
995                         return NULL;
996                 }
997         }
998
999         /*
1000          * If we got here we are going to create the thread so we must initilise the structure
1001          * first because most implimentations of threading can't create it in a stopped state
1002          * and it might want to do things with its structure that aren't initialised otherwise.
1003          */
1004         if(name)
1005         {
1006                 this_thread->name = name;
1007         }
1008         else
1009         {
1010                 this_thread->name = "Un-named Thread";
1011         }
1012         
1013         this_thread->flags = flags;
1014         this_thread->thread_func = thread_func;
1015         this_thread->user_args = args;
1016         /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
1017          * load average for the system. If we don't do this then we create a mass of threads at the same time 
1018          * because the creation didn't affect the load average.
1019          */
1020         this_thread->avg_blocked = 2;
1021         
1022         /*
1023          * When to start this thread
1024          */
1025         this_thread->when = when;
1026
1027         begin_critical_section(S_SCHEDULE_LIST);
1028         this_thread->next = CtdlThreadSchedList;
1029         CtdlThreadSchedList = this_thread;
1030         if (this_thread->next)
1031                 this_thread->next->prev = this_thread;
1032         end_critical_section(S_SCHEDULE_LIST);
1033         
1034         return this_thread;
1035 }
1036
1037
1038
1039 CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_thread)
1040 {
1041         int ret = 0;
1042         
1043         /*
1044          * We pass this_thread into the thread as its args so that it can find out information
1045          * about itself and it has a bit of storage space for itself, not to mention that the REAL
1046          * thread function needs to finish off the setup of the structure
1047          */
1048         if ((ret = pthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
1049         {
1050
1051                 CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n",
1052                         strerror(ret));
1053                 return NULL;
1054         }
1055         
1056         
1057         num_threads++;  // Increase the count of threads in the system.
1058         if(this_thread->flags & CTDLTHREAD_WORKER)
1059                 num_workers++;
1060
1061         this_thread->next = CtdlThreadList;
1062         CtdlThreadList = this_thread;
1063         if (this_thread->next)
1064                 this_thread->next->prev = this_thread;
1065         
1066         return this_thread;
1067 }
1068
1069
1070
1071 void ctdl_thread_internal_check_scheduled(void)
1072 {
1073         CtdlThreadNode *this_thread, *that_thread;
1074         time_t now;
1075         
1076         if (try_critical_section(S_SCHEDULE_LIST))
1077                 return; /* If this list is locked we wait till the next chance */
1078         
1079         now = time(NULL);
1080         
1081 #ifdef WITH_THREADLOG
1082         CtdlLogPrintf(CTDL_DEBUG, "Checking for scheduled threads to start.\n");
1083 #endif
1084
1085         this_thread = CtdlThreadSchedList;
1086         while(this_thread)
1087         {
1088                 that_thread = this_thread;
1089                 this_thread = this_thread->next;
1090                 
1091                 if (now > that_thread->when)
1092                 {
1093                         /* Unlink from schedule list */
1094                         if (that_thread->prev)
1095                                 that_thread->prev->next = that_thread->next;
1096                         else
1097                                 CtdlThreadSchedList = that_thread->next;
1098                         if (that_thread->next)
1099                                 that_thread->next->prev = that_thread->prev;
1100                                 
1101                         that_thread->next = that_thread->prev = NULL;
1102 #ifdef WITH_THREADLOG
1103                         CtdlLogPrintf(CTDL_DEBUG, "About to start scheduled thread \"%s\".\n", that_thread->name);
1104 #endif
1105                         begin_critical_section(S_THREAD_LIST);
1106                         if (CT->state > CTDL_THREAD_STOP_REQ)
1107                         {       /* Only start it if the system is not stopping */
1108                                 pthread_mutex_lock(&that_thread->ThreadMutex);
1109                                 if (ctdl_thread_internal_start_scheduled (that_thread) == NULL)
1110                                 {
1111 #ifdef WITH_THREADLOG
1112                         CtdlLogPrintf(CTDL_DEBUG, "Failed to start scheduled thread \"%s\".\n", that_thread->name);
1113 #endif
1114                                         pthread_mutex_unlock(&that_thread->ThreadMutex);
1115                                         pthread_mutex_destroy(&(that_thread->ThreadMutex));
1116                                         pthread_cond_destroy(&(that_thread->ThreadCond));
1117                                         pthread_mutex_destroy(&(that_thread->SleepMutex));
1118                                         pthread_cond_destroy(&(that_thread->SleepCond));
1119                                         pthread_attr_destroy(&that_thread->attr);
1120                                         free(that_thread);
1121                                 }
1122                                 else
1123                                 {
1124                                         CtdlLogPrintf(CTDL_INFO, "Thread system, Started a scheduled thread \"%s\" (%ld).\n",
1125                                                 that_thread->name, that_thread->tid);
1126                                         pthread_mutex_unlock(&that_thread->ThreadMutex);
1127                                         ctdl_thread_internal_calc_loadavg();
1128                                 }
1129                         }
1130                         end_critical_section(S_THREAD_LIST);
1131                 }
1132                 else
1133                 {
1134 #ifdef WITH_THREADLOG
1135                         CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n", that_thread->name, that_thread->when - time(NULL));
1136 #endif
1137                 }
1138         }
1139         end_critical_section(S_SCHEDULE_LIST);
1140 }
1141
1142
1143 /*
1144  * A warapper function for select so we can show a thread as blocked
1145  */
1146 int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
1147 {
1148         int ret;
1149         
1150         ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1151         ret = select(n, readfds, writefds, exceptfds, timeout);
1152         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1153         return ret;
1154 }
1155
1156
1157
1158 void *new_worker_thread(void *arg);
1159 extern void close_masters (void);
1160
1161
1162
1163 void go_threading(void)
1164 {
1165         int i;
1166         CtdlThreadNode *last_worker;
1167         
1168         /*
1169          * Initialise the thread system
1170          */
1171         ctdl_thread_internal_init();
1172
1173         /* Second call to module init functions now that threading is up */
1174         initialise_modules(1);
1175
1176         /*
1177          * This thread is now used for garbage collection of other threads in the thread list
1178          */
1179         CtdlLogPrintf(CTDL_INFO, "Startup thread %d becoming garbage collector,\n", pthread_self());
1180
1181         /*
1182          * We do a lot of locking and unlocking of the thread list in here.
1183          * We do this so that we can repeatedly release time for other threads
1184          * that may be waiting on the thread list.
1185          * We are a low priority thread so we can afford to do this
1186          */
1187         
1188         while (CtdlThreadGetCount())
1189         {
1190                 if (CT->signal)
1191                         exit_signal = CT->signal;
1192                 if (exit_signal)
1193                 {
1194                         CtdlThreadStopAll();
1195 //                      close_masters();
1196                 }
1197                 check_sched_shutdown();
1198                 if (CT->state > CTDL_THREAD_STOP_REQ)
1199                 {
1200                         begin_critical_section(S_THREAD_LIST);
1201                         ctdl_thread_internal_calc_loadavg();
1202                         end_critical_section(S_THREAD_LIST);
1203                         
1204                         ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1205                 }
1206                 
1207                 /* Reduce the size of the worker thread pool if necessary. */
1208                 if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ))
1209                 {
1210                         /* Ask a worker thread to stop as we no longer need it */
1211                         begin_critical_section(S_THREAD_LIST);
1212                         last_worker = CtdlThreadList;
1213                         while (last_worker)
1214                         {
1215                                 pthread_mutex_lock(&last_worker->ThreadMutex);
1216                                 if (last_worker->flags & CTDLTHREAD_WORKER && (last_worker->state > CTDL_THREAD_STOPPING) && (last_worker->Context == NULL))
1217                                 {
1218                                         pthread_mutex_unlock(&last_worker->ThreadMutex);
1219                                         break;
1220                                 }
1221                                 pthread_mutex_unlock(&last_worker->ThreadMutex);
1222                                 last_worker = last_worker->next;
1223                         }
1224                         end_critical_section(S_THREAD_LIST);
1225                         if (last_worker)
1226                         {
1227 #ifdef WITH_THREADLOG
1228                                 CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
1229                                         last_worker->name,
1230                                         last_worker->tid
1231                                         );
1232 #endif
1233                                 CtdlThreadStop(last_worker);
1234                         }
1235                 }
1236         
1237                 /*
1238                  * If all our workers are working hard, start some more to help out
1239                  * with things
1240                  */
1241                 /* FIXME: come up with a better way to dynamically alter the number of threads
1242                  * based on the system load
1243                  */
1244 #ifdef NEW_WORKER
1245                 if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() <= num_sessions) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
1246 #else
1247                 if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60) && (CtdlThreadGetLoadAvg() < 90) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
1248 #endif /* NEW_WORKER */
1249                 {
1250                         for (i=0; i<5 ; i++)
1251                         {
1252 #ifdef NEW_WORKER
1253                                 CtdlThreadCreate("Worker Thread",
1254                                         CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
1255                                         new_worker_thread,
1256                                         NULL
1257                                         );
1258 #else
1259                                 CtdlThreadCreate("Worker Thread",
1260                                         CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
1261                                         worker_thread,
1262                                         NULL
1263                                         );
1264 #endif /* NEW_WORKER */
1265                         }
1266                 }
1267                 
1268                 CtdlThreadGC();
1269                 
1270                 if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector
1271                 {
1272                         CtdlThreadGC();
1273                 }
1274                 
1275                 if (CtdlThreadGetCount())
1276                         CtdlThreadSleep(1);
1277         }
1278         /*
1279          * If the above loop exits we must be shutting down since we obviously have no threads
1280          */
1281         ctdl_thread_internal_cleanup();
1282 }
1283
1284
1285
1286
1287 /*
1288  * Starting a new implimentation of a worker thread.
1289  * This new implimentation will be faster and do more work per thread.
1290  */
1291  
1292 /*
1293  * Select on master socket.
1294  * First worker thread in here acquires the lock and builds an FDSET of master sockets.
1295  * then it goes into a loop selecting on the master sockets timing out every few milliseconds.
1296  * If it times out it rebiulds its list and loops.
1297  * If the select succeeds it creates a new context and returns.
1298  * During this time the other workers are selecting on existing contexts or sleeping.
1299  */
1300 void select_on_master(void)
1301 {
1302         fd_set readfds;
1303         struct ServiceFunctionHook *serviceptr;
1304         int ssock;                      /* Descriptor for client socket */
1305         int highest;
1306         int m, i;
1307         int retval = 0;
1308         struct timeval tv;
1309         struct CitContext *con;
1310         const char *old_name;
1311
1312
1313
1314         old_name = CtdlThreadName("select_on_master");
1315
1316         /* Initialize the fdset. */
1317         FD_ZERO(&readfds);
1318         highest = 0;
1319
1320         /* First, add the various master sockets to the fdset. */
1321         for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
1322                 m = serviceptr->msock;
1323                 FD_SET(m, &readfds);
1324                 if (m > highest) {
1325                         highest = m;
1326                 }
1327         }
1328
1329         tv.tv_sec = 1;          /* wake up every 1 sec if no input */
1330         tv.tv_usec = 0;
1331         retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
1332
1333         /* Select got an error or we are shutting down so get out */
1334         if (retval == 0 || CtdlThreadCheckStop()) {
1335                 CtdlThreadName(old_name);
1336                 return;
1337         }
1338
1339         /* Select says something happened on one of our master sockets so now we handle it */
1340         for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
1341                 if (FD_ISSET(serviceptr->msock, &readfds)) {
1342                         ssock = accept(serviceptr->msock, NULL, 0);
1343                         if (ssock >= 0) {
1344                                 CtdlLogPrintf(CTDL_DEBUG, "New client socket %d\n", ssock);
1345                                 /* The master socket is non-blocking but the client
1346                                  * sockets need to be blocking, otherwise certain
1347                                  * operations barf on FreeBSD.  Not a fatal error.
1348                                  */
1349                                 if (fcntl(ssock, F_SETFL, 0) < 0) {
1350                                         CtdlLogPrintf(CTDL_EMERG,
1351                                                       "citserver: Can't set socket to blocking: %s\n",
1352                                                       strerror(errno));
1353                                 }
1354
1355                                 /* New context will be created already
1356                                  * set up in the CON_EXECUTING state.
1357                                  */
1358                                 con = CreateNewContext();
1359                                 CT->Context = con;
1360
1361                                 /* Assign our new socket number to it. */
1362                                 con->client_socket = ssock;
1363                                 con->h_command_function = serviceptr->h_command_function;
1364                                 con->h_async_function = serviceptr->h_async_function;
1365                                 con->ServiceName = serviceptr->ServiceName;
1366                                 /* Determine whether it's a local socket */
1367                                 if (serviceptr->sockpath != NULL)
1368                                         con->is_local_socket = 1;
1369
1370                                 /* Set the SO_REUSEADDR socket option */
1371                                 i = 1;
1372                                 setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
1373
1374                                 become_session(con);
1375                                 begin_session(con);
1376                                 serviceptr->h_greeting_function();
1377                                 become_session(NULL);
1378                                 con->state = CON_IDLE;
1379                                 break;
1380                         }
1381                 }
1382         }
1383
1384         CtdlThreadName(old_name);
1385 }
1386
1387 /*
1388  * Select on client socket.
1389  * First worker thread in here acquires the lock and builds an FDSET of client sockets.
1390  * then it selects on the client sockets timing out after 1 second.
1391  * If it times out the thread goes off to check on housekeeping etc.
1392  * If the select succeeds the thread goes off to handle the client request.
1393  * If the list of client connections is empty the threads all sleep for one second
1394  */
1395 struct CitContext *select_on_client(void)
1396 {
1397         fd_set readfds;
1398         struct timeval tv;
1399         int retval = 0;
1400         int highest=0;
1401         const char *old_name;
1402         
1403         
1404         old_name = CtdlThreadName("select_on_client");
1405         
1406         /* Initialise the fdset */
1407         FD_ZERO(&readfds);
1408         FD_SET(CT->Context->client_socket, &readfds);
1409         highest = CT->Context->client_socket;   
1410         /* Now we can select on any connections that are waiting */
1411         
1412         if (!CtdlThreadCheckStop())
1413         {
1414                 tv.tv_sec = config.c_sleeping;          /* wake up every second if no input */
1415                 tv.tv_usec = 0;
1416                 retval = select(highest + 1, &readfds, NULL, NULL, &tv);
1417         }
1418         else    /* Shutting down? */
1419         {
1420                 CtdlThreadName(old_name);
1421                 return(NULL);
1422         }
1423                 
1424
1425         /* Now figure out who made this select() unblock.
1426          * First, check for an error or exit condition.
1427          */
1428         if (retval < 0) {
1429                 if (errno == EBADF) {
1430                         CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n",
1431                                 strerror(errno));
1432                 }
1433                 if (errno != EINTR) {
1434                         CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno));
1435                         CtdlThreadStopAll();
1436                 } else if (!CtdlThreadCheckStop()) {
1437                         CtdlLogPrintf(CTDL_DEBUG, "Un handled select failure.\n");
1438                 }
1439                 CtdlThreadName(old_name);
1440                 return NULL;
1441         }
1442         else if(retval == 0)
1443         {
1444                 CtdlThreadName(old_name);
1445                 CT->Context->kill_me = 1;
1446                 CT->Context = NULL;
1447                 return CT->Context;
1448         }
1449         
1450         CT->Context->state = CON_EXECUTING;
1451         CT->Context->input_waiting = 1;
1452         
1453         CtdlThreadName(old_name);
1454         return (CT->Context);
1455 }
1456
1457
1458
1459 /*
1460  * Do the worker threads work when needed
1461  */
1462 int execute_session(struct CitContext *bind_me)
1463 {
1464         int force_purge;
1465         
1466         become_session(bind_me);
1467
1468         /* If the client has sent a command, execute it. */
1469         if (CC->input_waiting) {
1470                 CC->h_command_function();
1471                 CC->input_waiting = 0;
1472         }
1473
1474         /* If there are asynchronous messages waiting and the
1475          * client supports it, do those now */
1476         if ((CC->is_async) && (CC->async_waiting)
1477            && (CC->h_async_function != NULL)) {
1478                 CC->h_async_function();
1479                 CC->async_waiting = 0;
1480         }
1481                 
1482         force_purge = CC->kill_me;
1483         if (force_purge)
1484                 CT->Context = NULL;
1485         become_session(NULL);
1486         bind_me->state = CON_IDLE;
1487         return force_purge;
1488 }
1489
1490
1491
1492 extern void dead_session_purge(int force);
1493
1494 /*
1495  * A new worker_thread loop.
1496  */
1497  
1498 void *new_worker_thread(void *arg)
1499 {
1500         struct CitContext *bind_me;
1501         int force_purge;
1502         
1503         while (!CtdlThreadCheckStop()) {
1504
1505                 /* make doubly sure we're not holding any stale db handles
1506                  * which might cause a deadlock.
1507                  */
1508                 cdb_check_handles();
1509                 force_purge = 0;
1510                 bind_me = NULL;         /* Which session shall we handle? */
1511                         
1512                 if (CT->Context == NULL)
1513                         select_on_master();
1514                 if (CtdlThreadCheckStop())
1515                         break;
1516                         
1517                 if (CT->Context)
1518                         bind_me = select_on_client();
1519                 if (CtdlThreadCheckStop())
1520                         break;
1521                         
1522                 if (bind_me)
1523                         force_purge = execute_session(bind_me);
1524                         
1525                 dead_session_purge(force_purge);
1526                 if (CtdlThreadCheckStop())
1527                         break;
1528                         
1529                 do_housekeeping();
1530         }
1531         return NULL;
1532 }