2 * $Id: sysdep.c 5882 2007-12-13 19:46:05Z davew $
4 * Citadel "system dependent" stuff.
5 * See copyright.txt for copyright information.
7 * Here's where we have the Citadel thread implimentation
11 #include <sys/types.h>
13 #include <sys/socket.h>
18 #if TIME_WITH_SYS_TIME
19 # include <sys/time.h>
23 # include <sys/time.h>
30 #include "ctdl_module.h"
31 #include "modules_init.h"
32 #include "housekeeping.h"
34 #include "citserver.h"
35 #include "sysdep_decls.h"
38 * define this to use the new worker_thread method of handling connections
43 * New thread interface.
44 * To create a thread you must call one of the create thread functions.
45 * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
46 * struct CtdlThreadNode *node = NULL;
48 * If the thread is created *node will point to the thread control structure for the created thread.
49 * If the thread creation fails *node remains NULL
50 * Do not free the memory pointed to by *node, it doesn't belong to you.
51 * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
52 * remove the need for the calls to eCrashRegisterThread and friends
55 static int num_threads = 0; /* Current number of threads */
56 static int num_workers = 0; /* Current number of worker threads */
58 CtdlThreadNode *CtdlThreadList = NULL;
59 CtdlThreadNode *CtdlThreadSchedList = NULL;
61 static citthread_t GC_thread;
62 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
63 double CtdlThreadLoadAvg = 0;
64 double CtdlThreadWorkerAvg = 0;
65 citthread_key_t ThreadKey;
67 citthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */
70 citthread_cond_t worker_block;
71 citthread_mutex_t worker_block_mutex;
74 void InitialiseSemaphores(void)
78 /* Set up a bunch of semaphores to be used for critical sections */
79 for (i = 0; i < MAX_SEMAPHORES; ++i) {
80 citthread_mutex_init(&Critters[i], NULL);
88 * Obtain a semaphore lock to begin a critical section.
89 * but only if no one else has one
91 int try_critical_section(int which_one)
93 /* For all types of critical sections except those listed here,
94 * ensure nobody ever tries to do a critical section within a
95 * transaction; this could lead to deadlock.
97 if ((which_one != S_FLOORCACHE)
98 #ifdef DEBUG_MEMORY_LEAKS
99 && (which_one != S_DEBUGMEMLEAKS)
101 && (which_one != S_RPLIST)
105 return (citthread_mutex_trylock(&Critters[which_one]));
110 * Obtain a semaphore lock to begin a critical section.
112 void begin_critical_section(int which_one)
114 /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
116 /* For all types of critical sections except those listed here,
117 * ensure nobody ever tries to do a critical section within a
118 * transaction; this could lead to deadlock.
120 if ((which_one != S_FLOORCACHE)
121 #ifdef DEBUG_MEMORY_LEAKS
122 && (which_one != S_DEBUGMEMLEAKS)
124 && (which_one != S_RPLIST)
128 citthread_mutex_lock(&Critters[which_one]);
132 * Release a semaphore lock to end a critical section.
134 void end_critical_section(int which_one)
136 citthread_mutex_unlock(&Critters[which_one]);
141 * A function to destroy the TSD
143 static void ctdl_thread_internal_dest_tsd(void *arg)
153 * A function to initialise the thread TSD
155 void ctdl_thread_internal_init_tsd(void)
160 citthread_key_create(&ThreadKey,
161 ctdl_thread_internal_dest_tsd))) {
162 lprintf(CTDL_EMERG, "citthread_key_create: %s\n",
166 citthread_mutex_init (&worker_block_mutex, NULL);
167 citthread_cond_init(&worker_block, NULL);
171 * Ensure that we have a key for thread-specific data.
173 * This should be called immediately after startup by any thread
176 void CtdlThreadAllocTSD(void)
180 if (citthread_getspecific(ThreadKey) != NULL)
183 tsd = malloc(sizeof(ThreadTSD));
187 memset(tsd->cursors, 0, sizeof tsd->cursors);
190 citthread_setspecific(ThreadKey, tsd);
194 void ctdl_thread_internal_free_tsd(void)
196 ctdl_thread_internal_dest_tsd(citthread_getspecific(ThreadKey));
197 citthread_setspecific(ThreadKey, NULL);
201 void ctdl_thread_internal_cleanup(void)
204 CtdlThreadNode *this_thread, *that_thread;
206 for (i = 0; i < CTDL_THREAD_LAST_STATE; i++) {
207 free(CtdlThreadStates[i]);
210 /* Clean up the scheduled thread list */
211 this_thread = CtdlThreadSchedList;
212 while (this_thread) {
213 that_thread = this_thread;
214 this_thread = this_thread->next;
215 citthread_mutex_destroy(&that_thread->ThreadMutex);
216 citthread_cond_destroy(&that_thread->ThreadCond);
217 citthread_mutex_destroy(&that_thread->SleepMutex);
218 citthread_cond_destroy(&that_thread->SleepCond);
219 citthread_attr_destroy(&that_thread->attr);
222 ctdl_thread_internal_free_tsd();
225 void ctdl_thread_internal_init(void)
227 CtdlThreadNode *this_thread;
230 GC_thread = citthread_self();
231 CtdlThreadStates[CTDL_THREAD_INVALID] = strdup("Invalid Thread");
232 CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
233 CtdlThreadStates[CTDL_THREAD_CREATE] =
234 strdup("Thread being Created");
235 CtdlThreadStates[CTDL_THREAD_CANCELLED] =
236 strdup("Thread Cancelled");
237 CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
238 CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
239 CtdlThreadStates[CTDL_THREAD_STOP_REQ] =
240 strdup("Thread Stop Requested");
241 CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
242 CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
243 CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
245 /* Get ourself a thread entry */
246 this_thread = malloc(sizeof(CtdlThreadNode));
247 if (this_thread == NULL) {
248 CtdlLogPrintf(CTDL_EMERG,
249 "Thread system, can't allocate CtdlThreadNode, exiting\n");
252 // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
253 memset(this_thread, 0, sizeof(CtdlThreadNode));
255 citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
256 citthread_cond_init(&(this_thread->ThreadCond), NULL);
257 citthread_mutex_init(&(this_thread->SleepMutex), NULL);
258 citthread_cond_init(&(this_thread->SleepCond), NULL);
260 /* We are garbage collector so create us as running */
261 this_thread->state = CTDL_THREAD_RUNNING;
263 if ((ret = citthread_attr_init(&this_thread->attr))) {
264 CtdlLogPrintf(CTDL_EMERG,
265 "Thread system, citthread_attr_init: %s\n",
271 this_thread->name = "Garbage Collection Thread";
273 this_thread->tid = GC_thread;
276 num_threads++; // Increase the count of threads in the system.
278 this_thread->next = CtdlThreadList;
279 CtdlThreadList = this_thread;
280 if (this_thread->next)
281 this_thread->next->prev = this_thread;
282 /* Set up start times */
283 gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */
284 memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval)); /* Changed state so mark it. */
289 * A function to update a threads load averages
291 void ctdl_thread_internal_update_avgs(CtdlThreadNode * this_thread)
293 struct timeval now, result;
294 double last_duration;
296 gettimeofday(&now, NULL);
297 timersub(&now, &(this_thread->last_state_change), &result);
298 /* I don't think these mutex's are needed here */
299 citthread_mutex_lock(&this_thread->ThreadMutex);
300 // result now has a timeval for the time we spent in the last state since we last updated
302 (double) result.tv_sec +
303 ((double) result.tv_usec / (double) 1000000);
304 if (this_thread->state == CTDL_THREAD_SLEEPING)
305 this_thread->avg_sleeping += last_duration;
306 if (this_thread->state == CTDL_THREAD_RUNNING)
307 this_thread->avg_running += last_duration;
308 if (this_thread->state == CTDL_THREAD_BLOCKED)
309 this_thread->avg_blocked += last_duration;
310 memcpy(&this_thread->last_state_change, &now,
311 sizeof(struct timeval));
312 citthread_mutex_unlock(&this_thread->ThreadMutex);
316 * A function to chenge the state of a thread
318 void ctdl_thread_internal_change_state(CtdlThreadNode * this_thread,
319 enum CtdlThreadState new_state)
322 * Wether we change state or not we need update the load values
324 ctdl_thread_internal_update_avgs(this_thread);
325 /* This mutex not needed here? */
326 citthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */
327 if ((new_state == CTDL_THREAD_STOP_REQ)
328 && (this_thread->state > CTDL_THREAD_STOP_REQ))
329 this_thread->state = new_state;
330 if (((new_state == CTDL_THREAD_SLEEPING)
331 || (new_state == CTDL_THREAD_BLOCKED))
332 && (this_thread->state == CTDL_THREAD_RUNNING))
333 this_thread->state = new_state;
334 if ((new_state == CTDL_THREAD_RUNNING)
335 && ((this_thread->state == CTDL_THREAD_SLEEPING)
336 || (this_thread->state == CTDL_THREAD_BLOCKED)))
337 this_thread->state = new_state;
338 citthread_mutex_unlock(&this_thread->ThreadMutex);
343 * A function to tell all threads to exit
345 void CtdlThreadStopAll(void)
347 //FIXME: The signalling of the condition should not be in the critical_section
348 // We need to build a list of threads we are going to signal and then signal them afterwards
350 CtdlThreadNode *this_thread;
352 begin_critical_section(S_THREAD_LIST);
353 this_thread = CtdlThreadList;
354 while (this_thread) {
355 #ifdef THREADS_USESIGNALS
356 citthread_killl(this_thread->tid, SIGHUP);
358 citthread_kill(this_thread->tid, SIGUSR1);
359 ctdl_thread_internal_change_state(this_thread,
360 CTDL_THREAD_STOP_REQ);
361 citthread_cond_signal(&this_thread->ThreadCond);
362 citthread_cond_signal(&this_thread->SleepCond);
363 CtdlLogPrintf(CTDL_DEBUG,
364 "Thread system stopping thread \"%s\" (%ld).\n",
365 this_thread->name, this_thread->tid);
366 this_thread = this_thread->next;
368 end_critical_section(S_THREAD_LIST);
369 citthread_cond_broadcast(&worker_block);
374 * A function to wake up all sleeping threads
376 void CtdlThreadWakeAll(void)
378 CtdlThreadNode *this_thread;
380 CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
382 begin_critical_section(S_THREAD_LIST);
383 this_thread = CtdlThreadList;
384 while (this_thread) {
385 if (!this_thread->thread_func) {
386 citthread_cond_signal(&this_thread->ThreadCond);
387 citthread_cond_signal(&this_thread->SleepCond);
389 this_thread = this_thread->next;
391 end_critical_section(S_THREAD_LIST);
396 * A function to return the number of threads running in the system
398 int CtdlThreadGetCount(void)
403 int CtdlThreadGetWorkers(void)
408 double CtdlThreadGetWorkerAvg(void)
412 begin_critical_section(S_THREAD_LIST);
413 ret = CtdlThreadWorkerAvg;
414 end_critical_section(S_THREAD_LIST);
418 double CtdlThreadGetLoadAvg(void)
422 begin_critical_section(S_THREAD_LIST);
423 ret = CtdlThreadLoadAvg;
424 end_critical_section(S_THREAD_LIST);
432 * A function to rename a thread
433 * Returns a const char *
435 const char *CtdlThreadName(const char *name)
437 const char *old_name;
440 CtdlLogPrintf(CTDL_WARNING,
441 "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n",
453 * A function to force a thread to exit
455 void CtdlThreadCancel(CtdlThreadNode * thread)
457 CtdlThreadNode *this_thread;
462 this_thread = thread;
464 CtdlLogPrintf(CTDL_EMERG,
465 "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
470 if (!this_thread->thread_func) {
471 CtdlLogPrintf(CTDL_EMERG,
472 "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
477 ctdl_thread_internal_change_state(this_thread,
478 CTDL_THREAD_CANCELLED);
479 citthread_cancel(this_thread->tid);
484 * A function for a thread to check if it has been asked to stop
486 int CtdlThreadCheckStop(void)
491 CtdlLogPrintf(CTDL_EMERG,
492 "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
499 #ifdef THREADS_USERSIGNALS
501 CtdlLogPrintf(CTDL_DEBUG,
502 "Thread \"%s\" caught signal %d.\n",
503 CT->name, CT->signal);
505 if (state == CTDL_THREAD_STOP_REQ) {
506 CT->state = CTDL_THREAD_STOPPING;
508 } else if ((state < CTDL_THREAD_STOP_REQ)
509 && (state > CTDL_THREAD_CREATE)) {
517 * A function to ask a thread to exit
518 * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
520 void CtdlThreadStop(CtdlThreadNode * thread)
522 CtdlThreadNode *this_thread;
527 this_thread = thread;
530 if (!(this_thread->thread_func))
531 return; // Don't stop garbage collector
532 #ifdef THREADS_USESIGNALS
533 citthread_kill(this_thread->tid, SIGHUP);
535 ctdl_thread_internal_change_state(this_thread,
536 CTDL_THREAD_STOP_REQ);
537 citthread_cond_signal(&this_thread->ThreadCond);
538 citthread_cond_signal(&this_thread->SleepCond);
542 * So we now have a sleep command that works with threads but it is in seconds
544 void CtdlThreadSleep(int secs)
546 struct timespec wake_time;
547 struct timeval time_now;
551 CtdlLogPrintf(CTDL_WARNING,
552 "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
556 memset(&wake_time, 0, sizeof(struct timespec));
557 gettimeofday(&time_now, NULL);
558 wake_time.tv_sec = time_now.tv_sec + secs;
559 wake_time.tv_nsec = time_now.tv_usec * 10;
561 ctdl_thread_internal_change_state(CT, CTDL_THREAD_SLEEPING);
563 citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
564 citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex,
566 citthread_mutex_unlock(&CT->ThreadMutex);
568 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
573 * Routine to clean up our thread function on exit
575 static void ctdl_internal_thread_cleanup(void *arg)
578 * In here we were called by the current thread because it is exiting
579 * NB. WE ARE THE CURRENT THREAD
581 CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n",
584 #ifdef HAVE_BACKTRACE
585 eCrash_UnregisterThread();
588 citthread_mutex_lock(&CT->ThreadMutex);
589 CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
590 citthread_mutex_unlock(&CT->ThreadMutex);
594 * A quick function to show the load averages
596 void ctdl_thread_internal_calc_loadavg(void)
598 CtdlThreadNode *that_thread;
599 double load_avg, worker_avg;
602 that_thread = CtdlThreadList;
605 while (that_thread) {
606 /* Update load averages */
607 ctdl_thread_internal_update_avgs(that_thread);
608 citthread_mutex_lock(&that_thread->ThreadMutex);
609 that_thread->load_avg =
610 (that_thread->avg_sleeping +
611 that_thread->avg_running) /
612 (that_thread->avg_sleeping + that_thread->avg_running +
613 that_thread->avg_blocked) * 100;
614 that_thread->avg_sleeping /= 2;
615 that_thread->avg_running /= 2;
616 that_thread->avg_blocked /= 2;
617 load_avg += that_thread->load_avg;
618 if (that_thread->flags & CTDLTHREAD_WORKER) {
619 worker_avg += that_thread->load_avg;
622 #ifdef WITH_THREADLOG
623 CtdlLogPrintf(CTDL_DEBUG,
624 "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
625 that_thread->name, that_thread->tid,
626 CtdlThreadStates[that_thread->state],
627 that_thread->avg_sleeping,
628 that_thread->avg_running,
629 that_thread->avg_blocked,
630 that_thread->load_avg);
632 citthread_mutex_unlock(&that_thread->ThreadMutex);
633 that_thread = that_thread->next;
635 CtdlThreadLoadAvg = load_avg / num_threads;
636 CtdlThreadWorkerAvg = worker_avg / workers;
637 #ifdef WITH_THREADLOG
638 CtdlLogPrintf(CTDL_INFO,
639 "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n",
640 CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads,
641 num_workers, num_sessions);
647 * Garbage collection routine.
648 * Gets called by main() in a loop to clean up the thread list periodically.
650 void CtdlThreadGC(void)
652 CtdlThreadNode *this_thread, *that_thread;
653 int workers = 0, sys_workers;
656 begin_critical_section(S_THREAD_LIST);
658 /* Handle exiting of garbage collector thread */
659 if (num_threads == 1)
660 CtdlThreadList->state = CTDL_THREAD_EXITED;
662 #ifdef WITH_THREADLOG
663 CtdlLogPrintf(CTDL_DEBUG,
664 "Thread system running garbage collection.\n");
667 * Woke up to do garbage collection
669 this_thread = CtdlThreadList;
670 while (this_thread) {
671 that_thread = this_thread;
672 this_thread = this_thread->next;
674 /* Do we need to clean up this thread? */
675 if (that_thread->state != CTDL_THREAD_EXITED) {
676 if (that_thread->flags & CTDLTHREAD_WORKER)
677 workers++; /* Sanity check on number of worker threads */
681 if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) { /* Sanity check */
682 end_critical_section(S_THREAD_LIST);
683 CtdlLogPrintf(CTDL_EMERG,
684 "Thread system PANIC, a thread is trying to clean up after itself.\n");
689 if (num_threads <= 0) { /* Sanity check */
690 end_critical_section(S_THREAD_LIST);
691 CtdlLogPrintf(CTDL_EMERG,
692 "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
697 if (that_thread->flags & CTDLTHREAD_WORKER)
698 num_workers--; /* This is a wroker thread so reduce the count. */
700 /* If we are unlinking the list head then the next becomes the list head */
701 if (that_thread->prev)
702 that_thread->prev->next = that_thread->next;
704 CtdlThreadList = that_thread->next;
705 if (that_thread->next)
706 that_thread->next->prev = that_thread->prev;
708 citthread_cond_signal(&that_thread->ThreadCond);
709 citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake
710 citthread_mutex_lock(&that_thread->ThreadMutex); // Make sure it has done what its doing
711 citthread_mutex_unlock(&that_thread->ThreadMutex);
713 * Join on the thread to do clean up and prevent memory leaks
714 * Also makes sure the thread has cleaned up after itself before we remove it from the list
715 * We can join on the garbage collector thread the join should just return EDEADLCK
717 ret = citthread_join(that_thread->tid, NULL);
719 CtdlLogPrintf(CTDL_DEBUG,
720 "Garbage collection on own thread.\n");
721 else if (ret == EINVAL)
722 CtdlLogPrintf(CTDL_DEBUG,
723 "Garbage collection, that thread already joined on.\n");
724 else if (ret == ESRCH)
725 CtdlLogPrintf(CTDL_DEBUG,
726 "Garbage collection, no thread to join on.\n");
728 CtdlLogPrintf(CTDL_DEBUG,
729 "Garbage collection, citthread_join returned an unknown error.\n");
731 * Now we own that thread entry
733 CtdlLogPrintf(CTDL_INFO,
734 "Garbage Collection for thread \"%s\" (%ld).\n",
735 that_thread->name, that_thread->tid);
736 citthread_mutex_destroy(&that_thread->ThreadMutex);
737 citthread_cond_destroy(&that_thread->ThreadCond);
738 citthread_mutex_destroy(&that_thread->SleepMutex);
739 citthread_cond_destroy(&that_thread->SleepCond);
740 citthread_attr_destroy(&that_thread->attr);
743 sys_workers = num_workers;
744 end_critical_section(S_THREAD_LIST);
746 /* Sanity check number of worker threads */
747 if (workers != sys_workers) {
748 CtdlLogPrintf(CTDL_EMERG,
749 "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
750 workers, sys_workers);
759 * Runtime function for a Citadel Thread.
760 * This initialises the threads environment and then calls the user supplied thread function
761 * Note that this is the REAL thread function and wraps the users thread function.
763 static void *ctdl_internal_thread_func(void *arg)
765 CtdlThreadNode *this_thread;
768 /* lock and unlock the thread list.
769 * This causes this thread to wait until all its creation stuff has finished before it
770 * can continue its execution.
772 begin_critical_section(S_THREAD_LIST);
773 this_thread = (CtdlThreadNode *) arg;
774 gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */
775 // citthread_mutex_lock(&this_thread->ThreadMutex);
777 // Register the cleanup function to take care of when we exit.
778 citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
779 // Get our thread data structure
780 CtdlThreadAllocTSD();
782 this_thread->pid = getpid();
783 memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval)); /* Changed state so mark it. */
784 /* Only change to running state if we weren't asked to stop during the create cycle
785 * Other wise there is a window to allow this threads creation to continue to full grown and
786 * therby prevent a shutdown of the server.
788 // citthread_mutex_unlock(&this_thread->ThreadMutex);
790 if (!CtdlThreadCheckStop()) {
791 citthread_mutex_lock(&this_thread->ThreadMutex);
792 this_thread->state = CTDL_THREAD_RUNNING;
793 citthread_mutex_unlock(&this_thread->ThreadMutex);
795 end_critical_section(S_THREAD_LIST);
797 // Register for tracing
798 #ifdef HAVE_BACKTRACE
799 eCrash_RegisterThread(this_thread->name, 0);
802 // Tell the world we are here
803 CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n",
804 this_thread->name, this_thread->tid);
809 * run the thread to do the work but only if we haven't been asked to stop
811 if (!CtdlThreadCheckStop())
812 ret = (this_thread->thread_func) (this_thread->user_args);
815 * Our thread is exiting either because it wanted to end or because the server is stopping
816 * We need to clean up
818 citthread_cleanup_pop(1); // Execute our cleanup routine and remove it
827 * Function to initialise an empty thread structure
829 CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *
830 this_thread, long flags)
834 // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
835 memset(this_thread, 0, sizeof(CtdlThreadNode));
837 /* Create the mutex's early so we can use them */
838 citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
839 citthread_cond_init(&(this_thread->ThreadCond), NULL);
840 citthread_mutex_init(&(this_thread->SleepMutex), NULL);
841 citthread_cond_init(&(this_thread->SleepCond), NULL);
843 this_thread->state = CTDL_THREAD_CREATE;
845 if ((ret = citthread_attr_init(&this_thread->attr))) {
846 citthread_mutex_unlock(&this_thread->ThreadMutex);
847 citthread_mutex_destroy(&(this_thread->ThreadMutex));
848 citthread_cond_destroy(&(this_thread->ThreadCond));
849 citthread_mutex_destroy(&(this_thread->SleepMutex));
850 citthread_cond_destroy(&(this_thread->SleepCond));
851 CtdlLogPrintf(CTDL_EMERG,
852 "Thread system, citthread_attr_init: %s\n",
858 /* Our per-thread stacks need to be bigger than the default size,
859 * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
860 * crashes on 64-bit Linux.
862 if (flags & CTDLTHREAD_BIGSTACK) {
863 #ifdef WITH_THREADLOG
864 CtdlLogPrintf(CTDL_INFO,
865 "Thread system. Creating BIG STACK thread.\n");
868 citthread_attr_setstacksize(&this_thread->attr,
870 citthread_mutex_unlock(&this_thread->ThreadMutex);
871 citthread_mutex_destroy(&
874 citthread_cond_destroy(&(this_thread->ThreadCond));
875 citthread_mutex_destroy(&
876 (this_thread->SleepMutex));
877 citthread_cond_destroy(&(this_thread->SleepCond));
878 citthread_attr_destroy(&this_thread->attr);
879 CtdlLogPrintf(CTDL_EMERG,
880 "Thread system, citthread_attr_setstacksize: %s\n",
887 /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
888 * load average for the system. If we don't do this then we create a mass of threads at the same time
889 * because the creation didn't affect the load average.
891 this_thread->avg_blocked = 2;
893 return (this_thread);
900 * Internal function to create a thread.
902 CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags,
903 void *(*thread_func) (void
908 CtdlThreadNode *this_thread;
910 if (num_threads >= 32767) {
911 CtdlLogPrintf(CTDL_EMERG,
912 "Thread system. Thread list full.\n");
916 this_thread = malloc(sizeof(CtdlThreadNode));
917 if (this_thread == NULL) {
918 CtdlLogPrintf(CTDL_EMERG,
919 "Thread system, can't allocate CtdlThreadNode, exiting\n");
923 /* Initialise the thread structure */
924 if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
926 CtdlLogPrintf(CTDL_EMERG,
927 "Thread system, can't initialise CtdlThreadNode, exiting\n");
931 * If we got here we are going to create the thread so we must initilise the structure
932 * first because most implimentations of threading can't create it in a stopped state
933 * and it might want to do things with its structure that aren't initialised otherwise.
936 this_thread->name = name;
938 this_thread->name = "Un-named Thread";
941 this_thread->flags = flags;
942 this_thread->thread_func = thread_func;
943 this_thread->user_args = args;
945 // citthread_mutex_lock(&this_thread->ThreadMutex);
947 begin_critical_section(S_THREAD_LIST);
949 * We pass this_thread into the thread as its args so that it can find out information
950 * about itself and it has a bit of storage space for itself, not to mention that the REAL
951 * thread function needs to finish off the setup of the structure
954 citthread_create(&this_thread->tid, &this_thread->attr,
955 ctdl_internal_thread_func,
956 this_thread) != 0)) {
957 end_critical_section(S_THREAD_LIST);
958 CtdlLogPrintf(CTDL_ALERT,
959 "Thread system, Can't create thread: %s\n",
961 citthread_mutex_unlock(&this_thread->ThreadMutex);
962 citthread_mutex_destroy(&(this_thread->ThreadMutex));
963 citthread_cond_destroy(&(this_thread->ThreadCond));
964 citthread_mutex_destroy(&(this_thread->SleepMutex));
965 citthread_cond_destroy(&(this_thread->SleepCond));
966 citthread_attr_destroy(&this_thread->attr);
971 num_threads++; // Increase the count of threads in the system.
972 if (this_thread->flags & CTDLTHREAD_WORKER)
975 this_thread->next = CtdlThreadList;
976 CtdlThreadList = this_thread;
977 if (this_thread->next)
978 this_thread->next->prev = this_thread;
979 ctdl_thread_internal_calc_loadavg();
981 // citthread_mutex_unlock(&this_thread->ThreadMutex);
982 end_critical_section(S_THREAD_LIST);
988 * Wrapper function to create a thread
989 * ensures the critical section and other protections are in place.
990 * char *name = name to give to thread, if NULL, use generic name
991 * int flags = flags to determine type of thread and standard facilities
993 CtdlThreadNode *CtdlThreadCreate(char *name, long flags,
994 void *(*thread_func) (void *arg),
997 CtdlThreadNode *ret = NULL;
999 ret = ctdl_internal_create_thread(name, flags, thread_func, args);
1006 * Internal function to schedule a thread.
1007 * Must be called from within a S_THREAD_LIST critical section
1009 CtdlThreadNode *CtdlThreadSchedule(char *name, long flags,
1010 void *(*thread_func) (void *arg),
1011 void *args, time_t when)
1013 CtdlThreadNode *this_thread;
1015 if (num_threads >= 32767) {
1016 CtdlLogPrintf(CTDL_EMERG,
1017 "Thread system. Thread list full.\n");
1021 this_thread = malloc(sizeof(CtdlThreadNode));
1022 if (this_thread == NULL) {
1023 CtdlLogPrintf(CTDL_EMERG,
1024 "Thread system, can't allocate CtdlThreadNode, exiting\n");
1027 /* Initialise the thread structure */
1028 if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
1030 CtdlLogPrintf(CTDL_EMERG,
1031 "Thread system, can't initialise CtdlThreadNode, exiting\n");
1036 * If we got here we are going to create the thread so we must initilise the structure
1037 * first because most implimentations of threading can't create it in a stopped state
1038 * and it might want to do things with its structure that aren't initialised otherwise.
1041 this_thread->name = name;
1043 this_thread->name = "Un-named Thread";
1046 this_thread->flags = flags;
1047 this_thread->thread_func = thread_func;
1048 this_thread->user_args = args;
1051 * When to start this thread
1053 this_thread->when = when;
1055 begin_critical_section(S_SCHEDULE_LIST);
1056 this_thread->next = CtdlThreadSchedList;
1057 CtdlThreadSchedList = this_thread;
1058 if (this_thread->next)
1059 this_thread->next->prev = this_thread;
1060 end_critical_section(S_SCHEDULE_LIST);
1067 CtdlThreadNode *ctdl_thread_internal_start_scheduled(CtdlThreadNode *
1072 // citthread_mutex_lock(&that_thread->ThreadMutex);
1073 begin_critical_section(S_THREAD_LIST);
1075 * We pass this_thread into the thread as its args so that it can find out information
1076 * about itself and it has a bit of storage space for itself, not to mention that the REAL
1077 * thread function needs to finish off the setup of the structure
1080 citthread_create(&this_thread->tid, &this_thread->attr,
1081 ctdl_internal_thread_func,
1082 this_thread) != 0)) {
1083 end_critical_section(S_THREAD_LIST);
1084 CtdlLogPrintf(CTDL_DEBUG,
1085 "Failed to start scheduled thread \"%s\": %s\n",
1086 this_thread->name, strerror(ret));
1087 // citthread_mutex_unlock(&this_thread->ThreadMutex);
1088 citthread_mutex_destroy(&(this_thread->ThreadMutex));
1089 citthread_cond_destroy(&(this_thread->ThreadCond));
1090 citthread_mutex_destroy(&(this_thread->SleepMutex));
1091 citthread_cond_destroy(&(this_thread->SleepCond));
1092 citthread_attr_destroy(&this_thread->attr);
1098 num_threads++; // Increase the count of threads in the system.
1099 if (this_thread->flags & CTDLTHREAD_WORKER)
1102 this_thread->next = CtdlThreadList;
1103 CtdlThreadList = this_thread;
1104 if (this_thread->next)
1105 this_thread->next->prev = this_thread;
1106 // citthread_mutex_unlock(&that_thread->ThreadMutex);
1108 ctdl_thread_internal_calc_loadavg();
1109 end_critical_section(S_THREAD_LIST);
1117 void ctdl_thread_internal_check_scheduled(void)
1119 CtdlThreadNode *this_thread, *that_thread;
1122 if (try_critical_section(S_SCHEDULE_LIST))
1123 return; /* If this list is locked we wait till the next chance */
1127 #ifdef WITH_THREADLOG
1128 CtdlLogPrintf(CTDL_DEBUG,
1129 "Checking for scheduled threads to start.\n");
1132 this_thread = CtdlThreadSchedList;
1133 while (this_thread) {
1134 that_thread = this_thread;
1135 this_thread = this_thread->next;
1137 if (now > that_thread->when) {
1138 /* Unlink from schedule list */
1139 if (that_thread->prev)
1140 that_thread->prev->next =
1143 CtdlThreadSchedList = that_thread->next;
1144 if (that_thread->next)
1145 that_thread->next->prev =
1148 that_thread->next = that_thread->prev = NULL;
1149 #ifdef WITH_THREADLOG
1150 CtdlLogPrintf(CTDL_DEBUG,
1151 "About to start scheduled thread \"%s\".\n",
1154 if (CT->state > CTDL_THREAD_STOP_REQ) { /* Only start it if the system is not stopping */
1155 if (ctdl_thread_internal_start_scheduled
1157 #ifdef WITH_THREADLOG
1158 CtdlLogPrintf(CTDL_INFO,
1159 "Thread system, Started a scheduled thread \"%s\" (%ud).\n",
1166 #ifdef WITH_THREADLOG
1168 CtdlLogPrintf(CTDL_DEBUG,
1169 "Thread \"%s\" will start in %ld seconds.\n",
1171 that_thread->when - time(NULL));
1175 end_critical_section(S_SCHEDULE_LIST);
1180 * A warapper function for select so we can show a thread as blocked
1182 int CtdlThreadSelect(int n, fd_set * readfds, fd_set * writefds,
1183 fd_set * exceptfds, struct timeval *timeout)
1187 ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1188 ret = select(n, readfds, writefds, exceptfds, timeout);
1189 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1195 void *new_worker_thread(void *arg);
1196 extern void close_masters(void);
1197 void *select_on_master(void *args);
1198 void *select_on_client(void *args);
1199 CtdlThreadNode *client_select_thread;
1200 CtdlThreadNode *master_select_thread;
1202 void go_threading(void)
1205 CtdlThreadNode *last_worker;
1208 * Initialise the thread system
1210 ctdl_thread_internal_init();
1212 /* Second call to module init functions now that threading is up */
1213 initialise_modules(1);
1216 CtdlThreadCreate("House keeping",
1217 CTDLTHREAD_BIGSTACK,
1218 do_housekeeping, NULL);
1222 master_select_thread = CtdlThreadCreate ("Select on Master", 0, select_on_master, NULL);
1223 client_select_thread = CtdlThreadCreate ("Select on client", 0, select_on_client, NULL);
1227 * This thread is now used for garbage collection of other threads in the thread list
1229 CtdlLogPrintf(CTDL_INFO,
1230 "Startup thread %d becoming garbage collector,\n",
1234 * We do a lot of locking and unlocking of the thread list in here.
1235 * We do this so that we can repeatedly release time for other threads
1236 * that may be waiting on the thread list.
1237 * We are a low priority thread so we can afford to do this
1240 while (CtdlThreadGetCount()) {
1242 exit_signal = CT->signal;
1244 CtdlThreadStopAll();
1247 check_sched_shutdown();
1248 if (CT->state > CTDL_THREAD_STOP_REQ) {
1249 begin_critical_section(S_THREAD_LIST);
1250 ctdl_thread_internal_calc_loadavg();
1251 end_critical_section(S_THREAD_LIST);
1253 ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1256 /* Reduce the size of the worker thread pool if necessary. */
1257 if ((CtdlThreadGetWorkers() > config.c_min_workers + 1)
1258 && (CtdlThreadWorkerAvg < 20)
1259 && (CT->state > CTDL_THREAD_STOP_REQ)) {
1260 /* Ask a worker thread to stop as we no longer need it */
1261 begin_critical_section(S_THREAD_LIST);
1262 last_worker = CtdlThreadList;
1263 while (last_worker) {
1264 citthread_mutex_lock(&last_worker->
1266 if (last_worker->flags & CTDLTHREAD_WORKER
1267 && (last_worker->state >
1268 CTDL_THREAD_STOPPING)
1269 && (last_worker->Context == NULL)) {
1270 citthread_mutex_unlock
1271 (&last_worker->ThreadMutex);
1274 citthread_mutex_unlock(&last_worker->
1276 last_worker = last_worker->next;
1278 end_critical_section(S_THREAD_LIST);
1280 #ifdef WITH_THREADLOG
1281 CtdlLogPrintf(CTDL_DEBUG,
1282 "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
1286 CtdlThreadStop(last_worker);
1291 * If all our workers are working hard, start some more to help out
1294 /* FIXME: come up with a better way to dynamically alter the number of threads
1295 * based on the system load
1298 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1299 && (CtdlThreadGetWorkers() <= num_sessions))
1300 || CtdlThreadGetWorkers() < config.c_min_workers)
1301 && (CT->state > CTDL_THREAD_STOP_REQ))
1303 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1304 && (CtdlThreadGetWorkerAvg() > 60)
1305 && (CtdlThreadGetLoadAvg() < 90))
1306 || CtdlThreadGetWorkers() < config.c_min_workers)
1307 && (CT->state > CTDL_THREAD_STOP_REQ))
1308 #endif /* NEW_WORKER */
1310 for (i = 0; i < 5; i++) {
1312 CtdlThreadCreate("Worker Thread (new)",
1313 CTDLTHREAD_BIGSTACK +
1315 new_worker_thread, NULL);
1317 CtdlThreadCreate("Worker Thread",
1318 CTDLTHREAD_BIGSTACK +
1320 worker_thread, NULL);
1321 #endif /* NEW_WORKER */
1327 if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector
1332 if (CtdlThreadGetCount())
1336 * If the above loop exits we must be shutting down since we obviously have no threads
1338 ctdl_thread_internal_cleanup();
1346 * Starting a new implimentation of a worker thread.
1347 * This new implimentation will be faster and do more work per thread.
1350 // TODO: need to sort out the thread states and signals
1351 // TODO: slect_on_master should not be a big stack thread.
1352 // TODO: slect_on_client should not be a big stack thread.
1353 // TODO: select_on_master is not a worker thread and should be blocked when in select
1354 // TODO: select_on_client is not a worker thread and should be blocked when in select
1356 * Select on master socket.
1357 * One specific thread comes in here and never leaves.
1358 * This thread blocks on select until something happens.
1359 * The select only returns if a new connection is made or the select is interrupted by some means.
1360 * We need to interrupt the select if the list of ServiceHook's changes or we are shutting down.
1361 * We should probably use a signal to interrupt the select is a ServiceHook is created.
1362 * When a ServiceHook is destroyed its socket will close which will awaken the select.
1364 void *select_on_master(void *arg)
1367 struct ServiceFunctionHook *serviceptr;
1368 int ssock; /* Descriptor for client socket */
1372 struct CitContext *con;
1376 while (!CtdlThreadCheckStop()) {
1377 CtdlThreadName("select_on_master");
1379 /* Initialize the fdset. */
1383 /* First, add the various master sockets to the fdset. */
1384 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1385 serviceptr = serviceptr->next) {
1386 m = serviceptr->msock;
1387 FD_SET(m, &readfds);
1393 /** We can block indefinately since something will wake us up eventually
1394 * Even if it is a signal telling us the system is shutting down
1397 CtdlThreadSelect(highest + 1, &readfds, NULL, NULL,
1400 /** Select got an error or we are shutting down so get out */
1401 if (retval == 0 || CtdlThreadCheckStop()) {
1405 /** Select says something happened on one of our master sockets so now we handle it */
1406 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1407 serviceptr = serviceptr->next) {
1408 if (FD_ISSET(serviceptr->msock, &readfds)) {
1409 ssock = accept(serviceptr->msock, NULL, 0);
1411 CtdlLogPrintf(CTDL_DEBUG,
1412 "New client socket %d\n",
1414 /* The master socket is non-blocking but the client
1415 * sockets need to be blocking, otherwise certain
1416 * operations barf on FreeBSD. Not a fatal error.
1418 if (fcntl(ssock, F_SETFL, 0) < 0) {
1419 CtdlLogPrintf(CTDL_EMERG,
1420 "citserver: Can't set socket to blocking: %s\n",
1424 /* New context will be created already
1425 * set up in the CON_EXECUTING state.
1427 con = CreateNewContext();
1428 /* Assign our new socket number to it. */
1429 con->client_socket = ssock;
1430 con->h_command_function =
1431 serviceptr->h_command_function;
1432 con->h_async_function =
1433 serviceptr->h_async_function;
1435 serviceptr->ServiceName;
1436 con->h_greeting_function = serviceptr->h_greeting_function;
1437 /* Determine whether it's a local socket */
1438 if (serviceptr->sockpath != NULL)
1439 con->is_local_socket = 1;
1441 /* Set the SO_REUSEADDR socket option */
1443 setsockopt(ssock, SOL_SOCKET,
1447 /** Now we can pass this context to an idle worker thread to get things going
1448 * What if there are no idle workers?
1449 * We could create one but what if the thread list is full?
1450 * Then I guess we need to close the socket a reject the connection.
1452 /** TODO: If there are no idle threads then this server is overloaded and we should reject the connection
1453 * This will have the effect of throttling the incomming connections on master sockets
1454 * a little and slow the process down.
1456 // if (idle_workers)
1458 con->state = CON_START;
1459 citthread_kill(client_select_thread->tid, SIGUSR1);
1460 citthread_cond_signal(&worker_block);
1463 // output try later message
1464 //start_context(con);
1475 * Select on client socket.
1476 * Only one dedicated thread in here.
1477 * We have to interrupt our select whenever a context is returned to the CON_READY state.
1478 * as a result each context may be close to timing out its client so we have to calculate
1479 * which client socket will timeout first and expire our select on that time.
1482 void *select_on_client(void *arg)
1485 struct timeval tv, now, result;
1488 struct CitContext *ptr;
1490 CtdlThreadName("select_on_client");
1492 while (!CtdlThreadCheckStop()) {
1493 /* Initialise the fdset */
1496 /** Get the clients to select on */
1497 tv.tv_sec = config.c_sleeping;
1499 begin_critical_section(S_SESSION_TABLE);
1500 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1501 if (ptr->state == CON_IDLE) {
1502 gettimeofday(&now, NULL);
1503 timersub(&(ptr->client_expires_at),
1505 if (result.tv_sec <= 0) {
1506 /** This client has timed out so kill it */
1510 /** Is this one going to expire first? */
1511 timersub(&result, &tv, &now);
1512 if (now.tv_sec <= 0 && now.tv_usec <= 0) {
1513 tv.tv_sec = result.tv_sec;
1514 tv.tv_usec = result.tv_usec;
1516 FD_SET(ptr->client_socket, &readfds);
1517 if (ptr->client_socket > highest)
1518 highest = ptr->client_socket;
1521 end_critical_section(S_SESSION_TABLE);
1523 /* Now we can select on any connections that are waiting */
1524 if (!CtdlThreadCheckStop()) {
1526 CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
1527 } else { /* Shutting down? */
1533 /* Now figure out who made this select() unblock.
1534 * First, check for an error or exit condition.
1537 if (errno == EBADF) {
1538 CtdlLogPrintf(CTDL_NOTICE,
1539 "select() failed: (%s)\n",
1542 if (errno != EINTR) {
1543 CtdlLogPrintf(CTDL_EMERG,
1546 CtdlThreadStopAll();
1547 } else if (!CtdlThreadCheckStop()) {
1548 CtdlLogPrintf(CTDL_DEBUG,
1549 "Un handled select failure.\n");
1551 } else if (retval > 0) {
1552 begin_critical_section(S_SESSION_TABLE);
1553 for (ptr = ContextList; ptr != NULL;
1556 (ptr->client_socket, &readfds))
1557 && (ptr->state == CON_IDLE)) {
1558 ptr->input_waiting = 1;
1559 ptr->state = CON_READY;
1560 /** reset client expire time */
1561 ptr->client_expires_at.tv_sec = config.c_sleeping;
1562 ptr->client_expires_at.tv_usec = 0;
1565 end_critical_section(S_SESSION_TABLE);
1574 * Do the worker threads work when needed
1576 int execute_session(struct CitContext *bind_me)
1580 become_session(bind_me);
1582 /* If the client has sent a command, execute it. */
1583 if (CC->input_waiting) {
1584 CC->h_command_function();
1585 CC->input_waiting = 0;
1588 /* If there are asynchronous messages waiting and the
1589 * client supports it, do those now */
1590 if ((CC->is_async) && (CC->async_waiting)
1591 && (CC->h_async_function != NULL)) {
1592 CC->h_async_function();
1593 CC->async_waiting = 0;
1596 force_purge = CC->kill_me;
1599 become_session(NULL);
1600 bind_me->state = CON_IDLE;
1606 extern void dead_session_purge(int force);
1609 * A new worker_thread loop.
1612 void *new_worker_thread(void *arg)
1614 struct CitContext *bind_me, *ptr;
1617 while (!CtdlThreadCheckStop()) {
1619 /* make doubly sure we're not holding any stale db handles
1620 * which might cause a deadlock.
1622 cdb_check_handles();
1624 bind_me = NULL; /* Which session shall we handle? */
1626 begin_critical_section(S_SESSION_TABLE);
1627 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1628 if (ptr->state == CON_START) {
1629 ptr->state = CON_EXECUTING;
1630 end_critical_section(S_SESSION_TABLE);
1631 become_session(ptr);
1633 ptr->h_greeting_function();
1634 become_session(NULL);
1635 ptr->state = CON_IDLE;
1638 if (ptr->state == CON_READY) {
1639 ptr->state = CON_EXECUTING;
1640 end_critical_section(S_SESSION_TABLE);
1641 force_purge = execute_session(ptr);
1646 end_critical_section(S_SESSION_TABLE);
1648 dead_session_purge(force_purge);
1650 /** block the worker threads waiting for a select to do something */
1652 ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1653 citthread_mutex_lock(&worker_block_mutex);
1654 citthread_cond_wait(&worker_block, &worker_block_mutex);
1655 citthread_mutex_unlock(&worker_block_mutex);
1656 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1659 if (CtdlThreadCheckStop())