* by counting up threads starting at 3, we seem to get what top shows in the LWPID...
[citadel.git] / citadel / threads.c
1 /*
2  * $Id$
3  *
4  * Citadel "system dependent" stuff.
5  * See COPYING for copyright information.
6  *
7  * Here's where we have the Citadel thread implimentation
8  *
9  */
10
11 #include <stdlib.h>
12 #include <unistd.h>
13 #include <stdio.h>
14 #include <sys/types.h>
15 #include <errno.h>
16 #include <sys/socket.h>
17 #include <unistd.h>
18 #include <fcntl.h>
19 #include <signal.h>
20
21 #if TIME_WITH_SYS_TIME
22 # include <sys/time.h>
23 # include <time.h>
24 #else
25 # if HAVE_SYS_TIME_H
26 #  include <sys/time.h>
27 # else
28 #  include <time.h>
29 # endif
30 #endif
31
32 #include <libcitadel.h>
33
34 #include "threads.h"
35 #include "ctdl_module.h"
36 #include "modules_init.h"
37 #include "housekeeping.h"
38 #include "config.h"
39 #include "citserver.h"
40 #include "sysdep_decls.h"
41 #include "context.h"
42
43 /*
44  * define this to use the new worker_thread method of handling connections
45  */
46 //#define NEW_WORKER
47
48 /*
49  * New thread interface.
50  * To create a thread you must call one of the create thread functions.
51  * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
52  * struct CtdlThreadNode *node = NULL;
53  * pass in &node
54  * If the thread is created *node will point to the thread control structure for the created thread.
55  * If the thread creation fails *node remains NULL
56  * Do not free the memory pointed to by *node, it doesn't belong to you.
57  * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
58  * remove the need for the calls to eCrashRegisterThread and friends
59  */
60
61 static int next_tid = 3;                        /* offset LWPID to PID */
62 static int num_threads = 0;                     /* Current number of threads */
63 static int num_workers = 0;                     /* Current number of worker threads */
64 long statcount = 0;             /* are we doing a stats check? */
65 static long stats_done = 0;
66
67 CtdlThreadNode *CtdlThreadList = NULL;
68 CtdlThreadNode *CtdlThreadSchedList = NULL;
69
70 static CtdlThreadNode *GC_thread = NULL;
71 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
72 double CtdlThreadLoadAvg = 0;
73 double CtdlThreadWorkerAvg = 0;
74 citthread_key_t ThreadKey;
75
76 citthread_mutex_t Critters[MAX_SEMAPHORES];     /* Things needing locking */
77
78
79
80 void InitialiseSemaphores(void)
81 {
82         int i;
83
84         /* Set up a bunch of semaphores to be used for critical sections */
85         for (i=0; i<MAX_SEMAPHORES; ++i) {
86                 citthread_mutex_init(&Critters[i], NULL);
87         }
88 }
89
90
91
92
93 /*
94  * Obtain a semaphore lock to begin a critical section.
95  * but only if no one else has one
96  */
97 int try_critical_section(int which_one)
98 {
99         /* For all types of critical sections except those listed here,
100          * ensure nobody ever tries to do a critical section within a
101          * transaction; this could lead to deadlock.
102          */
103         if (    (which_one != S_FLOORCACHE)
104 #ifdef DEBUG_MEMORY_LEAKS
105                 && (which_one != S_DEBUGMEMLEAKS)
106 #endif
107                 && (which_one != S_RPLIST)
108         ) {
109                 cdb_check_handles();
110         }
111         return (citthread_mutex_trylock(&Critters[which_one]));
112 }
113
114
115 /*
116  * Obtain a semaphore lock to begin a critical section.
117  */
118 void begin_critical_section(int which_one)
119 {
120         /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
121
122         /* For all types of critical sections except those listed here,
123          * ensure nobody ever tries to do a critical section within a
124          * transaction; this could lead to deadlock.
125          */
126         if (    (which_one != S_FLOORCACHE)
127 #ifdef DEBUG_MEMORY_LEAKS
128                 && (which_one != S_DEBUGMEMLEAKS)
129 #endif
130                 && (which_one != S_RPLIST)
131         ) {
132                 cdb_check_handles();
133         }
134         citthread_mutex_lock(&Critters[which_one]);
135 }
136
137 /*
138  * Release a semaphore lock to end a critical section.
139  */
140 void end_critical_section(int which_one)
141 {
142         citthread_mutex_unlock(&Critters[which_one]);
143 }
144
145
146 /*
147  * A function to destroy the TSD
148  */
149 static void ctdl_thread_internal_dest_tsd(void *arg)
150 {
151         if (arg != NULL) {
152                 check_handles(arg);
153                 free(arg);
154         }
155 }
156
157
158 /*
159  * A function to initialise the thread TSD
160  */
161 void ctdl_thread_internal_init_tsd(void)
162 {
163         int ret;
164         
165         if ((ret = citthread_key_create(&ThreadKey, ctdl_thread_internal_dest_tsd))) {
166                 CtdlLogPrintf(CTDL_EMERG, "citthread_key_create: %s\n", strerror(ret));
167                 exit(CTDLEXIT_DB);
168         }
169 }
170
171 /*
172  * Ensure that we have a key for thread-specific data. 
173  *
174  * This should be called immediately after startup by any thread 
175  * 
176  */
177 void CtdlThreadAllocTSD(void)
178 {
179         ThreadTSD *tsd;
180
181         if (citthread_getspecific(ThreadKey) != NULL)
182                 return;
183
184         tsd = malloc(sizeof(ThreadTSD));
185
186         tsd->tid = NULL;
187
188         memset(tsd->cursors, 0, sizeof tsd->cursors);
189         tsd->self = NULL;
190         
191         citthread_setspecific(ThreadKey, tsd);
192 }
193
194
195 void ctdl_thread_internal_free_tsd(void)
196 {
197         ctdl_thread_internal_dest_tsd(citthread_getspecific(ThreadKey));
198         citthread_setspecific(ThreadKey, NULL);
199 }
200
201
202 void ctdl_thread_internal_cleanup(void)
203 {
204         int i;
205         CtdlThreadNode *this_thread, *that_thread;
206         
207         for (i=0; i<CTDL_THREAD_LAST_STATE; i++)
208         {
209                 free (CtdlThreadStates[i]);
210         }
211         
212         /* Clean up the scheduled thread list */
213         this_thread = CtdlThreadSchedList;
214         while (this_thread)
215         {
216                 that_thread = this_thread;
217                 this_thread = this_thread->next;
218                 citthread_mutex_destroy(&that_thread->ThreadMutex);
219                 citthread_cond_destroy(&that_thread->ThreadCond);
220                 citthread_mutex_destroy(&that_thread->SleepMutex);
221                 citthread_cond_destroy(&that_thread->SleepCond);
222                 citthread_attr_destroy(&that_thread->attr);
223                 free(that_thread);
224         }
225         ctdl_thread_internal_free_tsd();
226 }
227
228 void ctdl_thread_internal_init(void)
229 {
230         CtdlThreadNode *this_thread;
231         int ret = 0;
232         
233         CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread");
234         CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
235         CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created");
236         CtdlThreadStates[CTDL_THREAD_CANCELLED] = strdup("Thread Cancelled");
237         CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
238         CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
239         CtdlThreadStates[CTDL_THREAD_STOP_REQ] = strdup("Thread Stop Requested");
240         CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
241         CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
242         CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
243         
244         /* Get ourself a thread entry */
245         this_thread = malloc(sizeof(CtdlThreadNode));
246         if (this_thread == NULL) {
247                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
248                 return;
249         }
250         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
251         memset (this_thread, 0, sizeof(CtdlThreadNode));
252         
253         citthread_mutex_init (&(this_thread->ThreadMutex), NULL);
254         citthread_cond_init (&(this_thread->ThreadCond), NULL);
255         citthread_mutex_init (&(this_thread->SleepMutex), NULL);
256         citthread_cond_init (&(this_thread->SleepCond), NULL);
257         
258         /* We are garbage collector so create us as running */
259         this_thread->state = CTDL_THREAD_RUNNING;
260         
261         if ((ret = citthread_attr_init(&this_thread->attr))) {
262                 CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret));
263                 free(this_thread);
264                 return;
265         }
266
267         this_thread->name = "Garbage Collection Thread";
268         
269         this_thread->tid = citthread_self();
270         GC_thread = this_thread;
271         CT = this_thread;
272         
273         num_threads++;  // Increase the count of threads in the system.
274
275         this_thread->next = CtdlThreadList;
276         CtdlThreadList = this_thread;
277         if (this_thread->next)
278                 this_thread->next->prev = this_thread;
279         /* Set up start times */
280         gettimeofday(&this_thread->start_time, NULL);           /* Time this thread started */
281         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval));     /* Changed state so mark it. */
282 }
283
284
285 /*
286  * A function to update a threads load averages
287  */
288  void ctdl_thread_internal_update_avgs(CtdlThreadNode *this_thread)
289  {
290         struct timeval now, result;
291         double last_duration;
292
293         gettimeofday(&now, NULL);
294         timersub(&now, &(this_thread->last_state_change), &result);
295         /* I don't think these mutex's are needed here */
296         citthread_mutex_lock(&this_thread->ThreadMutex);
297         // result now has a timeval for the time we spent in the last state since we last updated
298         last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000);
299         if (this_thread->state == CTDL_THREAD_SLEEPING)
300                 this_thread->avg_sleeping += last_duration;
301         if (this_thread->state == CTDL_THREAD_RUNNING)
302                 this_thread->avg_running += last_duration;
303         if (this_thread->state == CTDL_THREAD_BLOCKED)
304                 this_thread->avg_blocked += last_duration;
305         memcpy (&this_thread->last_state_change, &now, sizeof (struct timeval));
306         citthread_mutex_unlock(&this_thread->ThreadMutex);
307 }
308
309 /*
310  * A function to chenge the state of a thread
311  */
312 void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state)
313 {
314         /*
315          * Wether we change state or not we need update the load values
316          */
317         ctdl_thread_internal_update_avgs(this_thread);
318         /* This mutex not needed here? */
319         citthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */
320         if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ))
321                 this_thread->state = new_state;
322         if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING))
323                 this_thread->state = new_state;
324         if ((new_state == CTDL_THREAD_RUNNING) && ((this_thread->state == CTDL_THREAD_SLEEPING) || (this_thread->state == CTDL_THREAD_BLOCKED)))
325                 this_thread->state = new_state;
326         citthread_mutex_unlock(&this_thread->ThreadMutex);
327 }
328
329
330 /*
331  * A function to tell all threads to exit
332  */
333 void CtdlThreadStopAll(void)
334 {
335         /* First run any registered shutdown hooks.  This probably doesn't belong here. */
336         PerformSessionHooks(EVT_SHUTDOWN);
337
338         //FIXME: The signalling of the condition should not be in the critical_section
339         // We need to build a list of threads we are going to signal and then signal them afterwards
340         
341         CtdlThreadNode *this_thread;
342         
343         begin_critical_section(S_THREAD_LIST);
344         this_thread = CtdlThreadList;
345         // Ask the GC thread to stop first so everything knows we are shutting down.
346         GC_thread->state = CTDL_THREAD_STOP_REQ;
347         while(this_thread)
348         {
349                 if (!citthread_equal(this_thread->tid, GC_thread->tid))
350                         citthread_kill(this_thread->tid, SIGHUP);
351
352                 ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
353                 citthread_cond_signal(&this_thread->ThreadCond);
354                 citthread_cond_signal(&this_thread->SleepCond);
355                 this_thread->stop_ticker = time(NULL);
356                 CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (0x%08lx).\n",
357                         this_thread->name, this_thread->tid);
358                 this_thread = this_thread->next;
359         }
360         end_critical_section(S_THREAD_LIST);
361 }
362
363
364 /*
365  * A function to wake up all sleeping threads
366  */
367 void CtdlThreadWakeAll(void)
368 {
369         CtdlThreadNode *this_thread;
370         
371         CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
372         
373         begin_critical_section(S_THREAD_LIST);
374         this_thread = CtdlThreadList;
375         while(this_thread)
376         {
377                 if (!this_thread->thread_func)
378                 {
379                         citthread_cond_signal(&this_thread->ThreadCond);
380                         citthread_cond_signal(&this_thread->SleepCond);
381                 }
382                 this_thread = this_thread->next;
383         }
384         end_critical_section(S_THREAD_LIST);
385 }
386
387
388 /*
389  * A function to return the number of threads running in the system
390  */
391 int CtdlThreadGetCount(void)
392 {
393         return  num_threads;
394 }
395
396 int CtdlThreadGetWorkers(void)
397 {
398         return  num_workers;
399 }
400
401 double CtdlThreadGetWorkerAvg(void)
402 {
403         double ret;
404         
405         begin_critical_section(S_THREAD_LIST);
406         ret =  CtdlThreadWorkerAvg;
407         end_critical_section(S_THREAD_LIST);
408         return ret;
409 }
410
411 double CtdlThreadGetLoadAvg(void)
412 {
413         double load_avg[3] = {0.0, 0.0, 0.0};
414
415         int ret = 0;
416         int smp_num_cpus;
417
418         /* Borrowed this straight from procps */
419         smp_num_cpus = sysconf(_SC_NPROCESSORS_ONLN);
420         if(smp_num_cpus<1) smp_num_cpus=1; /* SPARC glibc is buggy */
421
422 #ifdef HAVE_GETLOADAVG
423         ret = getloadavg(load_avg, 3);
424 #endif
425         if (ret < 0)
426                 return 0;
427         return load_avg[0] / smp_num_cpus;
428 /*
429  * This old chunk of code return a value that indicated the load on citserver
430  * This value could easily reach 100 % even when citserver was doing very little and
431  * hence the machine has much more spare capacity.
432  * Because this value was used to determine if the machine was under heavy load conditions
433  * from other processes in the system then citserver could be strangled un-necesarily
434  * What we are actually trying to achieve is to strangle citserver if the machine is heavily loaded.
435  * So we have changed this.
436
437         begin_critical_section(S_THREAD_LIST);
438         ret =  CtdlThreadLoadAvg;
439         end_critical_section(S_THREAD_LIST);
440         return ret;
441 */
442 }
443
444
445
446
447 /*
448  * A function to rename a thread
449  * Returns a const char *
450  */
451 const char *CtdlThreadName(const char *name)
452 {
453         const char *old_name;
454         
455         if (!CT)
456         {
457                 CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name);
458                 return NULL;
459         }
460         old_name = CT->name;
461         if (name)
462                 CT->name = name;
463         return (old_name);
464 }       
465
466
467 /*
468  * A function to force a thread to exit
469  */
470 void CtdlThreadCancel(CtdlThreadNode *thread)
471 {
472         CtdlThreadNode *this_thread;
473         
474         if (!thread)
475                 this_thread = CT;
476         else
477                 this_thread = thread;
478         if (!this_thread)
479         {
480                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
481                 CtdlThreadStopAll();
482                 return;
483         }
484         
485         if (!this_thread->thread_func)
486         {
487                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
488                 CtdlThreadStopAll();
489                 return;
490         }
491         
492         ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED);
493         citthread_cancel(this_thread->tid);
494 }
495
496
497 /*
498  * A function for a thread to check if it has been asked to stop
499  */
500 int CtdlThreadCheckStop(void)
501 {
502         int state;
503         
504         if (!CT)
505         {
506                 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
507                 CtdlThreadStopAll();
508                 return -1;
509         }
510         
511         state = CT->state;
512
513         if (CT->signal)
514         {
515                 CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal);
516                 if (CT->signal == SIGHUP)
517                         CT->state = CTDL_THREAD_STOP_REQ;
518                 CT->signal = 0;
519         }
520         if(state == CTDL_THREAD_STOP_REQ)
521         {
522                 CT->state = CTDL_THREAD_STOPPING;
523                 return -1;
524         }
525         else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE))
526         {
527                 return -1;
528         }
529         return 0;
530 }
531
532
533 /*
534  * A function to ask a thread to exit
535  * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
536  */
537 void CtdlThreadStop(CtdlThreadNode *thread)
538 {
539         CtdlThreadNode *this_thread;
540         
541         if (!thread)
542                 this_thread = CT;
543         else
544                 this_thread = thread;
545         if (!this_thread)
546                 return;
547         if (!(this_thread->thread_func))
548                 return;         // Don't stop garbage collector
549
550         if (!citthread_equal(this_thread->tid, GC_thread->tid))
551                 citthread_kill(this_thread->tid, SIGHUP);
552
553         ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
554         citthread_cond_signal(&this_thread->ThreadCond);
555         citthread_cond_signal(&this_thread->SleepCond);
556         this_thread->stop_ticker = time(NULL);
557 }
558
559 /*
560  * So we now have a sleep command that works with threads but it is in seconds
561  */
562 void CtdlThreadSleep(int secs)
563 {
564         struct timespec wake_time;
565         struct timeval time_now;
566         
567         
568         if (!CT)
569         {
570                 CtdlLogPrintf(CTDL_WARNING, "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
571                 return;
572         }
573         
574         memset (&wake_time, 0, sizeof(struct timespec));
575         gettimeofday(&time_now, NULL);
576         wake_time.tv_sec = time_now.tv_sec + secs;
577         wake_time.tv_nsec = time_now.tv_usec * 10;
578
579         ctdl_thread_internal_change_state (CT, CTDL_THREAD_SLEEPING);
580         
581         citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
582         citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, &wake_time);
583         citthread_mutex_unlock(&CT->ThreadMutex);
584         
585         ctdl_thread_internal_change_state (CT, CTDL_THREAD_RUNNING);
586 }
587
588
589 /*
590  * Routine to clean up our thread function on exit
591  */
592 static void ctdl_internal_thread_cleanup(void *arg)
593 {
594         /*
595          * In here we were called by the current thread because it is exiting
596          * NB. WE ARE THE CURRENT THREAD
597          */
598         if (CT)
599         {
600                 const char *name = CT->name;
601                 const pid_t tid = CT->tid;
602
603                 CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (0x%08lx) exited.\n", name, tid);
604         }
605         else 
606         {
607                 CtdlLogPrintf(CTDL_NOTICE, "some ((unknown ? ? ?) Thread exited.\n");
608         }
609         
610         #ifdef HAVE_BACKTRACE
611 ///     eCrash_UnregisterThread();
612         #endif
613         
614         citthread_mutex_lock(&CT->ThreadMutex);
615         CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
616         citthread_mutex_unlock(&CT->ThreadMutex);
617 }
618
619 /*
620  * A quick function to show the load averages
621  */
622 void ctdl_thread_internal_calc_loadavg(void)
623 {
624         CtdlThreadNode *that_thread;
625         double load_avg, worker_avg;
626         int workers = 0;
627
628         that_thread = CtdlThreadList;
629         load_avg = 0;
630         worker_avg = 0;
631         while(that_thread)
632         {
633                 /* Update load averages */
634                 ctdl_thread_internal_update_avgs(that_thread);
635                 citthread_mutex_lock(&that_thread->ThreadMutex);
636                 that_thread->load_avg = (that_thread->avg_sleeping + that_thread->avg_running) / (that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked) * 100;
637                 that_thread->avg_sleeping /= 2;
638                 that_thread->avg_running /= 2;
639                 that_thread->avg_blocked /= 2;
640                 load_avg += that_thread->load_avg;
641                 if (that_thread->flags & CTDLTHREAD_WORKER)
642                 {
643                         worker_avg += that_thread->load_avg;
644                         workers++;
645                 }
646 #ifdef WITH_THREADLOG
647                 CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
648                         that_thread->name,
649                         that_thread->tid,
650                         CtdlThreadStates[that_thread->state],
651                         that_thread->avg_sleeping,
652                         that_thread->avg_running,
653                         that_thread->avg_blocked,
654                         that_thread->load_avg);
655 #endif
656                 citthread_mutex_unlock(&that_thread->ThreadMutex);
657                 that_thread = that_thread->next;
658         }
659         CtdlThreadLoadAvg = load_avg/num_threads;
660         CtdlThreadWorkerAvg = worker_avg/workers;
661 #ifdef WITH_THREADLOG
662         CtdlLogPrintf(CTDL_INFO, "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n", CtdlThreadGetLoadAvg(), CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions);
663 #endif
664 }
665
666
667 /*
668  * Garbage collection routine.
669  * Gets called by main() in a loop to clean up the thread list periodically.
670  */
671 void CtdlThreadGC (void)
672 {
673         CtdlThreadNode *this_thread, *that_thread;
674         int workers = 0, sys_workers;
675         int ret=0;
676
677         begin_critical_section(S_THREAD_LIST);
678         
679         /* Handle exiting of garbage collector thread */
680         if(num_threads == 1)
681                 CtdlThreadList->state = CTDL_THREAD_EXITED;
682         
683 #ifdef WITH_THREADLOG
684         CtdlLogPrintf(CTDL_DEBUG, "Thread system running garbage collection.\n");
685 #endif
686         /*
687          * Woke up to do garbage collection
688          */
689         this_thread = CtdlThreadList;
690         while(this_thread)
691         {
692                 that_thread = this_thread;
693                 this_thread = this_thread->next;
694                 
695                 if ((that_thread->state == CTDL_THREAD_STOP_REQ || that_thread->state == CTDL_THREAD_STOPPING)
696                         && (!citthread_equal(that_thread->tid, citthread_self())))
697                                 CtdlLogPrintf(CTDL_DEBUG, "Waiting for thread %s (0x%08lx) to exit.\n", that_thread->name, that_thread->tid);
698                 else
699                 {
700                         /**
701                          * Catch the situation where a worker was asked to stop but couldn't and we are not
702                          * shutting down.
703                          */
704                         that_thread->stop_ticker = 0;
705                 }
706                 
707                 if (that_thread->stop_ticker + 5 == time(NULL))
708                 {
709                         CtdlLogPrintf(CTDL_DEBUG, "Thread System: The thread \"%s\" (0x%08lx) failed to self terminate within 5 ticks. It would be cancelled now.\n", that_thread->name, that_thread->tid);
710                         if ((that_thread->flags & CTDLTHREAD_WORKER) == 0)
711                                 CtdlLogPrintf(CTDL_INFO, "Thread System: A non worker thread would have been canceled this may cause message loss.\n");
712 //                      that_thread->state = CTDL_THREAD_CANCELLED;
713                         that_thread->stop_ticker++;
714 //                      citthread_cancel(that_thread->tid);
715 //                      continue;
716                 }
717                 
718                 /* Do we need to clean up this thread? */
719                 if ((that_thread->state != CTDL_THREAD_EXITED) && (that_thread->state != CTDL_THREAD_CANCELLED))
720                 {
721                         if(that_thread->flags & CTDLTHREAD_WORKER)
722                                 workers++;      /* Sanity check on number of worker threads */
723                         continue;
724                 }
725                 
726                 if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func)
727                 {       /* Sanity check */
728                         end_critical_section(S_THREAD_LIST);
729                         CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n");
730                         abort();
731                         return;
732                 }
733                 
734                 if (num_threads <= 0)
735                 {       /* Sanity check */
736                         end_critical_section(S_THREAD_LIST);
737                         CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
738                         abort();
739                         return;
740                 }
741
742                 if(that_thread->flags & CTDLTHREAD_WORKER)
743                         num_workers--;  /* This is a wroker thread so reduce the count. */
744                 num_threads--;
745                 /* If we are unlinking the list head then the next becomes the list head */
746                 if(that_thread->prev)
747                         that_thread->prev->next = that_thread->next;
748                 else
749                         CtdlThreadList = that_thread->next;
750                 if(that_thread->next)
751                         that_thread->next->prev = that_thread->prev;
752                 
753                 citthread_cond_signal(&that_thread->ThreadCond);
754                 citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake
755                 citthread_mutex_lock(&that_thread->ThreadMutex);        // Make sure it has done what its doing
756                 citthread_mutex_unlock(&that_thread->ThreadMutex);
757                 /*
758                  * Join on the thread to do clean up and prevent memory leaks
759                  * Also makes sure the thread has cleaned up after itself before we remove it from the list
760                  * We can join on the garbage collector thread the join should just return EDEADLCK
761                  */
762                 ret = citthread_join (that_thread->tid, NULL);
763                 if (ret == EDEADLK)
764                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection on own thread.\n");
765                 else if (ret == EINVAL)
766                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, that thread already joined on.\n");
767                 else if (ret == ESRCH)
768                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, no thread to join on.\n");
769                 else if (ret != 0)
770                         CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, citthread_join returned an unknown error(%d).\n", ret);
771                 /*
772                  * Now we own that thread entry
773                  */
774                 CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (0x%08lx).\n",
775                         that_thread->name, that_thread->tid);
776                 citthread_mutex_destroy(&that_thread->ThreadMutex);
777                 citthread_cond_destroy(&that_thread->ThreadCond);
778                 citthread_mutex_destroy(&that_thread->SleepMutex);
779                 citthread_cond_destroy(&that_thread->SleepCond);
780                 citthread_attr_destroy(&that_thread->attr);
781                 free(that_thread);
782         }
783         sys_workers = num_workers;
784         end_critical_section(S_THREAD_LIST);
785         
786         /* Sanity check number of worker threads */
787         if (workers != sys_workers)
788         {
789                 CtdlLogPrintf(CTDL_EMERG,
790                         "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
791                         workers, sys_workers
792                         );
793                 abort();
794         }
795 }
796
797
798
799  
800 /*
801  * Runtime function for a Citadel Thread.
802  * This initialises the threads environment and then calls the user supplied thread function
803  * Note that this is the REAL thread function and wraps the users thread function.
804  */ 
805 static void *ctdl_internal_thread_func (void *arg)
806 {
807         CtdlThreadNode *this_thread;
808         void *ret = NULL;
809
810         /* lock and unlock the thread list.
811          * This causes this thread to wait until all its creation stuff has finished before it
812          * can continue its execution.
813          */
814         begin_critical_section(S_THREAD_LIST);
815         this_thread = (CtdlThreadNode *) arg;
816         gettimeofday(&this_thread->start_time, NULL);           /* Time this thread started */
817         
818         // Register the cleanup function to take care of when we exit.
819         citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
820         // Get our thread data structure
821         CtdlThreadAllocTSD();
822         CT = this_thread;
823         this_thread->pid = getpid();
824         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval));     /* Changed state so mark it. */
825         /* Only change to running state if we weren't asked to stop during the create cycle
826          * Other wise there is a window to allow this threads creation to continue to full grown and
827          * therby prevent a shutdown of the server.
828          */
829         if (!CtdlThreadCheckStop())
830         {
831                 citthread_mutex_lock(&this_thread->ThreadMutex);
832                 this_thread->state = CTDL_THREAD_RUNNING;
833                 citthread_mutex_unlock(&this_thread->ThreadMutex);
834         }
835         end_critical_section(S_THREAD_LIST);
836         
837         // Register for tracing
838         #ifdef HAVE_BACKTRACE
839 ///     eCrash_RegisterThread(this_thread->name, 0);
840         #endif
841         
842         // Tell the world we are here
843         CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (0x%08lx).\n",
844                 this_thread->name, this_thread->tid);
845         
846         /*
847          * run the thread to do the work but only if we haven't been asked to stop
848          */
849         if (!CtdlThreadCheckStop())
850                 ret = (this_thread->thread_func)(this_thread->user_args);
851         
852         /*
853          * Our thread is exiting either because it wanted to end or because the server is stopping
854          * We need to clean up
855          */
856         citthread_cleanup_pop(1);       // Execute our cleanup routine and remove it
857         
858         return(ret);
859 }
860
861
862
863
864 /*
865  * Function to initialise an empty thread structure
866  */
867 CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *this_thread, long flags)
868 {
869         int ret = 0;
870         
871         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
872         memset (this_thread, 0, sizeof(CtdlThreadNode));
873         
874         /* Create the mutex's early so we can use them */
875         citthread_mutex_init (&(this_thread->ThreadMutex), NULL);
876         citthread_cond_init (&(this_thread->ThreadCond), NULL);
877         citthread_mutex_init (&(this_thread->SleepMutex), NULL);
878         citthread_cond_init (&(this_thread->SleepCond), NULL);
879         
880         this_thread->state = CTDL_THREAD_CREATE;
881         
882         if ((ret = citthread_attr_init(&this_thread->attr))) {
883                 citthread_mutex_unlock(&this_thread->ThreadMutex);
884                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
885                 citthread_cond_destroy(&(this_thread->ThreadCond));
886                 citthread_mutex_destroy(&(this_thread->SleepMutex));
887                 citthread_cond_destroy(&(this_thread->SleepCond));
888                 CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret));
889                 free(this_thread);
890                 return NULL;
891         }
892
893         /* Our per-thread stacks need to be bigger than the default size,
894          * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
895          * crashes on 64-bit Linux.
896          */
897         if (flags & CTDLTHREAD_BIGSTACK)
898         {
899 #ifdef WITH_THREADLOG
900                 CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n");
901 #endif
902                 if ((ret = citthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) {
903                         citthread_mutex_unlock(&this_thread->ThreadMutex);
904                         citthread_mutex_destroy(&(this_thread->ThreadMutex));
905                         citthread_cond_destroy(&(this_thread->ThreadCond));
906                         citthread_mutex_destroy(&(this_thread->SleepMutex));
907                         citthread_cond_destroy(&(this_thread->SleepCond));
908                         citthread_attr_destroy(&this_thread->attr);
909                         CtdlLogPrintf(CTDL_EMERG, "Thread system, citthread_attr_setstacksize: %s\n",
910                                 strerror(ret));
911                         free(this_thread);
912                         return NULL;
913                 }
914         }
915
916         /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
917          * load average for the system. If we don't do this then we create a mass of threads at the same time 
918          * because the creation didn't affect the load average.
919          */
920         this_thread->avg_blocked = 2;
921         
922         return (this_thread);
923 }
924
925
926
927  
928 /*
929  * Internal function to create a thread.
930  */ 
931 CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args)
932 {
933         int ret = 0;
934         CtdlThreadNode *this_thread;
935
936         if (num_threads >= 32767)
937         {
938                 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
939                 return NULL;
940         }
941                 
942         this_thread = malloc(sizeof(CtdlThreadNode));
943         if (this_thread == NULL) {
944                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
945                 return NULL;
946         }
947         
948         /* Initialise the thread structure */
949         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL)
950         {
951                 free(this_thread);
952                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't initialise CtdlThreadNode, exiting\n");
953                 return NULL;
954         }
955         /*
956          * If we got here we are going to create the thread so we must initilise the structure
957          * first because most implimentations of threading can't create it in a stopped state
958          * and it might want to do things with its structure that aren't initialised otherwise.
959          */
960         if(name)
961         {
962                 this_thread->name = name;
963         }
964         else
965         {
966                 this_thread->name = "Un-named Thread";
967         }
968         
969         this_thread->flags = flags;
970         this_thread->thread_func = thread_func;
971         this_thread->user_args = args;
972         
973         begin_critical_section(S_THREAD_LIST);
974         /*
975          * We pass this_thread into the thread as its args so that it can find out information
976          * about itself and it has a bit of storage space for itself, not to mention that the REAL
977          * thread function needs to finish off the setup of the structure
978          */
979         if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
980         {
981                 end_critical_section(S_THREAD_LIST);
982                 CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n",
983                         strerror(ret));
984                 citthread_mutex_unlock(&this_thread->ThreadMutex);
985                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
986                 citthread_cond_destroy(&(this_thread->ThreadCond));
987                 citthread_mutex_destroy(&(this_thread->SleepMutex));
988                 citthread_cond_destroy(&(this_thread->SleepCond));
989                 citthread_attr_destroy(&this_thread->attr);
990                 free(this_thread);
991                 return NULL;
992         }
993         this_thread->reltid = next_tid;
994         next_tid++;
995         num_threads++;  // Increase the count of threads in the system.
996         if(this_thread->flags & CTDLTHREAD_WORKER)
997                 num_workers++;
998
999         this_thread->next = CtdlThreadList;
1000         CtdlThreadList = this_thread;
1001         if (this_thread->next)
1002                 this_thread->next->prev = this_thread;
1003         ctdl_thread_internal_calc_loadavg();
1004         
1005         end_critical_section(S_THREAD_LIST);
1006         
1007         return this_thread;
1008 }
1009
1010 /*
1011  * Wrapper function to create a thread
1012  * ensures the critical section and other protections are in place.
1013  * char *name = name to give to thread, if NULL, use generic name
1014  * int flags = flags to determine type of thread and standard facilities
1015  */
1016 CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args)
1017 {
1018         CtdlThreadNode *ret = NULL;
1019         
1020         ret = ctdl_internal_create_thread(name, flags, thread_func, args);
1021         return ret;
1022 }
1023
1024
1025
1026 /*
1027  * Internal function to schedule a thread.
1028  * Must be called from within a S_THREAD_LIST critical section
1029  */ 
1030 CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when)
1031 {
1032         CtdlThreadNode *this_thread;
1033
1034         if (num_threads >= 32767)
1035         {
1036                 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
1037                 return NULL;
1038         }
1039                 
1040         this_thread = malloc(sizeof(CtdlThreadNode));
1041         if (this_thread == NULL) {
1042                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
1043                 return NULL;
1044         }
1045         /* Initialise the thread structure */
1046         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL)
1047         {
1048                 free(this_thread);
1049                 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't initialise CtdlThreadNode, exiting\n");
1050                 return NULL;
1051         }
1052
1053         /*
1054          * If we got here we are going to create the thread so we must initilise the structure
1055          * first because most implimentations of threading can't create it in a stopped state
1056          * and it might want to do things with its structure that aren't initialised otherwise.
1057          */
1058         if(name)
1059         {
1060                 this_thread->name = name;
1061         }
1062         else
1063         {
1064                 this_thread->name = "Un-named Thread";
1065         }
1066         
1067         this_thread->flags = flags;
1068         this_thread->thread_func = thread_func;
1069         this_thread->user_args = args;
1070         
1071         /*
1072          * When to start this thread
1073          */
1074         this_thread->when = when;
1075
1076         begin_critical_section(S_SCHEDULE_LIST);
1077         this_thread->next = CtdlThreadSchedList;
1078         CtdlThreadSchedList = this_thread;
1079         if (this_thread->next)
1080                 this_thread->next->prev = this_thread;
1081         end_critical_section(S_SCHEDULE_LIST);
1082         
1083         return this_thread;
1084 }
1085
1086
1087
1088 CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_thread)
1089 {
1090         int ret = 0;
1091         
1092         begin_critical_section(S_THREAD_LIST);
1093         /*
1094          * We pass this_thread into the thread as its args so that it can find out information
1095          * about itself and it has a bit of storage space for itself, not to mention that the REAL
1096          * thread function needs to finish off the setup of the structure
1097          */
1098         if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
1099         {
1100                 end_critical_section(S_THREAD_LIST);
1101                 CtdlLogPrintf(CTDL_DEBUG, "Failed to start scheduled thread \"%s\": %s\n", this_thread->name, strerror(ret));
1102                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
1103                 citthread_cond_destroy(&(this_thread->ThreadCond));
1104                 citthread_mutex_destroy(&(this_thread->SleepMutex));
1105                 citthread_cond_destroy(&(this_thread->SleepCond));
1106                 citthread_attr_destroy(&this_thread->attr);
1107                 free(this_thread);
1108                 return NULL;
1109         }
1110         
1111         
1112         num_threads++;  // Increase the count of threads in the system.
1113         if(this_thread->flags & CTDLTHREAD_WORKER)
1114                 num_workers++;
1115
1116         this_thread->next = CtdlThreadList;
1117         CtdlThreadList = this_thread;
1118         if (this_thread->next)
1119                 this_thread->next->prev = this_thread;
1120         
1121         ctdl_thread_internal_calc_loadavg();
1122         end_critical_section(S_THREAD_LIST);
1123         
1124         
1125         return this_thread;
1126 }
1127
1128
1129
1130 void ctdl_thread_internal_check_scheduled(void)
1131 {
1132         CtdlThreadNode *this_thread, *that_thread;
1133         time_t now;
1134         
1135         /* Don't start scheduled threads if the system wants single user mode */
1136         if (CtdlWantSingleUser())
1137                 return;
1138         
1139         if (try_critical_section(S_SCHEDULE_LIST))
1140                 return; /* If this list is locked we wait till the next chance */
1141         
1142         now = time(NULL);
1143         
1144 #ifdef WITH_THREADLOG
1145         CtdlLogPrintf(CTDL_DEBUG, "Checking for scheduled threads to start.\n");
1146 #endif
1147
1148         this_thread = CtdlThreadSchedList;
1149         while(this_thread)
1150         {
1151                 that_thread = this_thread;
1152                 this_thread = this_thread->next;
1153                 
1154                 if (now > that_thread->when)
1155                 {
1156                         /* Unlink from schedule list */
1157                         if (that_thread->prev)
1158                                 that_thread->prev->next = that_thread->next;
1159                         else
1160                                 CtdlThreadSchedList = that_thread->next;
1161                         if (that_thread->next)
1162                                 that_thread->next->prev = that_thread->prev;
1163                                 
1164                         that_thread->next = that_thread->prev = NULL;
1165 #ifdef WITH_THREADLOG
1166                         CtdlLogPrintf(CTDL_DEBUG, "About to start scheduled thread \"%s\".\n", that_thread->name);
1167 #endif
1168                         if (CT->state > CTDL_THREAD_STOP_REQ)
1169                         {       /* Only start it if the system is not stopping */
1170                                 if (ctdl_thread_internal_start_scheduled (that_thread))
1171                                 {
1172 #ifdef WITH_THREADLOG
1173                                         CtdlLogPrintf(CTDL_INFO, "Thread system, Started a scheduled thread \"%s\" (0x%08lx).\n",
1174                                                 that_thread->name, that_thread->tid);
1175 #endif
1176                                 }
1177                         }
1178                 }
1179 #ifdef WITH_THREADLOG
1180                 else
1181                 {
1182                         CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n",
1183                                 that_thread->name, that_thread->when - time(NULL));
1184                 }
1185 #endif
1186         }
1187         end_critical_section(S_SCHEDULE_LIST);
1188 }
1189
1190
1191 /*
1192  * A warapper function for select so we can show a thread as blocked
1193  */
1194 int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
1195 {
1196         int ret = 0;
1197         
1198         ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1199         if (!CtdlThreadCheckStop())
1200                 ret = select(n, readfds, writefds, exceptfds, timeout);
1201         /**
1202          * If the select returned <= 0 then it failed due to an error
1203          * or timeout so this thread could stop if asked to do so.
1204          * Anything else means it needs to continue unless the system is shutting down
1205          */
1206         if (ret > 0)
1207         {
1208                 /**
1209                  * The select says this thread needs to do something useful.
1210                  * This thread was in an idle state so it may have been asked to stop
1211                  * but if the system isn't shutting down this thread is no longer
1212                  * idle and select has given it a task to do so it must not stop
1213                  * In this condition we need to force it into the running state.
1214                  * CtdlThreadGC will clear its ticker for us.
1215                  *
1216                  * FIXME: there is still a small hole here. It is possible for the sequence of locking
1217                  * to allow the state to get changed to STOP_REQ just after this code if the other thread
1218                  * has decided to change the state before this lock, it there fore has to wait till the lock
1219                  * completes but it will continue to change the state. We need something a bit better here.
1220                  */
1221                 citthread_mutex_lock(&CT->ThreadMutex); /* To prevent race condition of a sleeping thread */
1222                 if (GC_thread->state > CTDL_THREAD_STOP_REQ && CT->state <= CTDL_THREAD_STOP_REQ)
1223                 {
1224                         CtdlLogPrintf(CTDL_DEBUG, "Thread %s (0x%08lx) refused stop request.\n", CT->name, CT->tid);
1225                         CT->state = CTDL_THREAD_RUNNING;
1226                 }
1227                 citthread_mutex_unlock(&CT->ThreadMutex);
1228         }
1229
1230         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1231
1232         return ret;
1233 }
1234
1235
1236
1237 void *new_worker_thread(void *arg);
1238 extern void close_masters (void);
1239
1240
1241 void *simulation_worker (void*arg) {
1242         struct CitContext *this;
1243
1244         this = CreateNewContext();
1245         CtdlThreadSleep(1);
1246         this->kill_me = 1;
1247         this->state = CON_IDLE;
1248         dead_session_purge(1);
1249         begin_critical_section(S_SESSION_TABLE);
1250         stats_done++;
1251         end_critical_section(S_SESSION_TABLE);
1252         return NULL;
1253 }
1254
1255
1256 void *simulation_thread (void *arg)
1257 {
1258         long stats = statcount;
1259
1260         while(stats && !CtdlThreadCheckStop()) {
1261                 CtdlThreadCreate("Connection simulation worker", CTDLTHREAD_BIGSTACK, simulation_worker, NULL);
1262                 stats--;
1263         }
1264         CtdlThreadStopAll();
1265         return NULL;
1266 }
1267
1268 void go_threading(void)
1269 {
1270         int i;
1271         CtdlThreadNode *last_worker;
1272         struct timeval start, now, result;
1273         double last_duration;
1274
1275         /*
1276          * Initialise the thread system
1277          */
1278         ctdl_thread_internal_init();
1279
1280         /* Second call to module init functions now that threading is up */
1281         if (!statcount) {
1282                 initialise_modules(1);
1283                 CtdlThreadCreate("select_on_master", CTDLTHREAD_BIGSTACK, select_on_master, NULL);
1284         }
1285         else {
1286                 CtdlLogPrintf(CTDL_EMERG, "Running connection simulation stats\n");
1287                 gettimeofday(&start, NULL);
1288                 CtdlThreadCreate("Connection simulation master", CTDLTHREAD_BIGSTACK, simulation_thread, NULL);
1289         }
1290
1291
1292         /*
1293          * This thread is now used for garbage collection of other threads in the thread list
1294          */
1295         CtdlLogPrintf(CTDL_INFO, "Startup thread %d becoming garbage collector,\n", citthread_self());
1296
1297         /*
1298          * We do a lot of locking and unlocking of the thread list in here.
1299          * We do this so that we can repeatedly release time for other threads
1300          * that may be waiting on the thread list.
1301          * We are a low priority thread so we can afford to do this
1302          */
1303         
1304         while (CtdlThreadGetCount())
1305         {
1306                 if (CT->signal)
1307                         exit_signal = CT->signal;
1308                 if (exit_signal)
1309                 {
1310                         CtdlThreadStopAll();
1311                 }
1312                 check_sched_shutdown();
1313                 if (CT->state > CTDL_THREAD_STOP_REQ)
1314                 {
1315                         begin_critical_section(S_THREAD_LIST);
1316                         ctdl_thread_internal_calc_loadavg();
1317                         end_critical_section(S_THREAD_LIST);
1318                         
1319                         ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1320                 }
1321                 
1322                 /* Reduce the size of the worker thread pool if necessary. */
1323                 if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ))
1324                 {
1325                         /* Ask a worker thread to stop as we no longer need it */
1326                         begin_critical_section(S_THREAD_LIST);
1327                         last_worker = CtdlThreadList;
1328                         while (last_worker)
1329                         {
1330                                 citthread_mutex_lock(&last_worker->ThreadMutex);
1331                                 if (last_worker->flags & CTDLTHREAD_WORKER && (last_worker->state > CTDL_THREAD_STOPPING) && (last_worker->Context == NULL))
1332                                 {
1333                                         citthread_mutex_unlock(&last_worker->ThreadMutex);
1334                                         break;
1335                                 }
1336                                 citthread_mutex_unlock(&last_worker->ThreadMutex);
1337                                 last_worker = last_worker->next;
1338                         }
1339                         end_critical_section(S_THREAD_LIST);
1340                         if (last_worker)
1341                         {
1342 #ifdef WITH_THREADLOG
1343                                 CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (0x%08lx).\n",
1344                                         last_worker->name,
1345                                         last_worker->tid
1346                                         );
1347 #endif
1348                                 CtdlThreadStop(last_worker);
1349                         }
1350                 }
1351         
1352                 /*
1353                  * If all our workers are working hard, start some more to help out
1354                  * with things
1355                  */
1356                 /* FIXME: come up with a better way to dynamically alter the number of threads
1357                  * based on the system load
1358                  */
1359                 if (!statcount) {
1360                 if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60)) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
1361                 {
1362                         /* Only start new threads if we are not going to overload the machine */
1363                         /* Temporarily set to 10 should be enough to make sure we don't stranglew the server
1364                          * at least until we make this a config option */
1365                         if (CtdlThreadGetLoadAvg() < ((double)10.00)) {
1366                                 for (i=0; i<5 ; i++) {
1367                                         CtdlThreadCreate("Worker Thread",
1368                                                 CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
1369                                                 worker_thread,
1370                                                 NULL
1371                                                 );
1372                                 }
1373                         }
1374                         else
1375                                 CtdlLogPrintf (CTDL_WARNING, "Server strangled due to machine load average too high.\n");
1376                 }
1377                 }
1378
1379                 CtdlThreadGC();
1380
1381                 if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector
1382                 {
1383                         CtdlThreadGC();
1384                 }
1385                 
1386 #ifdef THREADS_USESIGNALS
1387                 if (CtdlThreadGetCount() && CT->state > CTDL_THREAD_STOP_REQ)
1388 #else
1389                 if (CtdlThreadGetCount())
1390 #endif
1391                         CtdlThreadSleep(1);
1392         }
1393         /*
1394          * If the above loop exits we must be shutting down since we obviously have no threads
1395          */
1396         ctdl_thread_internal_cleanup();
1397
1398         if (statcount) {
1399                 gettimeofday(&now, NULL);
1400                 timersub(&now, &start, &result);
1401                 last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000);
1402                 CtdlLogPrintf(CTDL_EMERG, "Simulated %ld connections in %f seconds\n", stats_done, last_duration);
1403         }
1404 }
1405
1406
1407
1408