2 * $Id: sysdep.c 5882 2007-12-13 19:46:05Z davew $
4 * Citadel "system dependent" stuff.
5 * See copyright.txt for copyright information.
7 * Here's where we have the Citadel thread implimentation
14 #include "ctdl_module.h"
15 #include "modules_init.h"
16 #include "housekeeping.h"
20 * New thread interface.
21 * To create a thread you must call one of the create thread functions.
22 * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
23 * struct CtdlThreadNode *node = NULL;
25 * If the thread is created *node will point to the thread control structure for the created thread.
26 * If the thread creation fails *node remains NULL
27 * Do not free the memory pointed to by *node, it doesn't belong to you.
28 * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
29 * remove the need for the calls to eCrashRegisterThread and friends
32 static int num_threads = 0; /* Current number of threads */
33 static int num_workers = 0; /* Current number of worker threads */
35 struct CtdlThreadNode *CtdlThreadList = NULL;
36 struct CtdlThreadNode *CtdlThreadSchedList = NULL;
39 * Condition variable and Mutex for thread garbage collection
41 /*static pthread_mutex_t thread_gc_mutex = PTHREAD_MUTEX_INITIALIZER;
42 static pthread_cond_t thread_gc_cond = PTHREAD_COND_INITIALIZER;
44 static pthread_t GC_thread;
45 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
46 double CtdlThreadLoadAvg = 0;
47 double CtdlThreadWorkerAvg = 0;
48 pthread_key_t ThreadKey;
50 pthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */
54 void InitialiseSemaphores(void)
58 /* Set up a bunch of semaphores to be used for critical sections */
59 for (i=0; i<MAX_SEMAPHORES; ++i) {
60 pthread_mutex_init(&Critters[i], NULL);
68 * Obtain a semaphore lock to begin a critical section.
69 * but only if no one else has one
71 int try_critical_section(int which_one)
73 /* For all types of critical sections except those listed here,
74 * ensure nobody ever tries to do a critical section within a
75 * transaction; this could lead to deadlock.
77 if ( (which_one != S_FLOORCACHE)
78 #ifdef DEBUG_MEMORY_LEAKS
79 && (which_one != S_DEBUGMEMLEAKS)
81 && (which_one != S_RPLIST)
85 return (pthread_mutex_trylock(&Critters[which_one]));
90 * Obtain a semaphore lock to begin a critical section.
92 void begin_critical_section(int which_one)
94 /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
96 /* For all types of critical sections except those listed here,
97 * ensure nobody ever tries to do a critical section within a
98 * transaction; this could lead to deadlock.
100 if ( (which_one != S_FLOORCACHE)
101 #ifdef DEBUG_MEMORY_LEAKS
102 && (which_one != S_DEBUGMEMLEAKS)
104 && (which_one != S_RPLIST)
108 pthread_mutex_lock(&Critters[which_one]);
112 * Release a semaphore lock to end a critical section.
114 void end_critical_section(int which_one)
116 pthread_mutex_unlock(&Critters[which_one]);
121 * A function to destroy the TSD
123 static void ctdl_thread_internal_dest_tsd(void *arg)
133 * A function to initialise the thread TSD
135 void ctdl_thread_internal_init_tsd(void)
139 if ((ret = pthread_key_create(&ThreadKey, ctdl_thread_internal_dest_tsd))) {
140 lprintf(CTDL_EMERG, "pthread_key_create: %s\n",
147 * Ensure that we have a key for thread-specific data.
149 * This should be called immediately after startup by any thread
152 void CtdlThreadAllocTSD(void)
156 if (pthread_getspecific(ThreadKey) != NULL)
159 tsd = malloc(sizeof(ThreadTSD));
163 memset(tsd->cursors, 0, sizeof tsd->cursors);
166 pthread_setspecific(ThreadKey, tsd);
170 void ctdl_thread_internal_free_tsd(void)
172 ctdl_thread_internal_dest_tsd(pthread_getspecific(ThreadKey));
173 pthread_setspecific(ThreadKey, NULL);
177 void ctdl_thread_internal_cleanup(void)
180 struct CtdlThreadNode *this_thread, *that_thread;
182 for (i=0; i<CTDL_THREAD_LAST_STATE; i++)
184 free (CtdlThreadStates[i]);
187 /* Clean up the scheduled thread list */
188 this_thread = CtdlThreadSchedList;
191 that_thread = this_thread;
192 this_thread = this_thread->next;
193 pthread_mutex_destroy(&that_thread->ThreadMutex);
194 pthread_cond_destroy(&that_thread->ThreadCond);
195 pthread_mutex_destroy(&that_thread->SleepMutex);
196 pthread_cond_destroy(&that_thread->SleepCond);
197 pthread_attr_destroy(&that_thread->attr);
200 ctdl_thread_internal_free_tsd();
203 void ctdl_thread_internal_init(void)
205 struct CtdlThreadNode *this_thread;
208 GC_thread = pthread_self();
209 CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread");
210 CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
211 CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created");
212 CtdlThreadStates[CTDL_THREAD_CANCELLED] = strdup("Thread Cancelled");
213 CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
214 CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
215 CtdlThreadStates[CTDL_THREAD_STOP_REQ] = strdup("Thread Stop Requested");
216 CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
217 CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
218 CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
220 /* Get ourself a thread entry */
221 this_thread = malloc(sizeof(struct CtdlThreadNode));
222 if (this_thread == NULL) {
223 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
226 // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
227 memset (this_thread, 0, sizeof(struct CtdlThreadNode));
229 pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
230 pthread_cond_init (&(this_thread->ThreadCond), NULL);
231 pthread_mutex_init (&(this_thread->SleepMutex), NULL);
232 pthread_cond_init (&(this_thread->SleepCond), NULL);
234 /* We are garbage collector so create us as running */
235 this_thread->state = CTDL_THREAD_RUNNING;
237 if ((ret = pthread_attr_init(&this_thread->attr))) {
238 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
243 this_thread->name = "Garbage Collection Thread";
245 this_thread->tid = GC_thread;
248 num_threads++; // Increase the count of threads in the system.
250 this_thread->next = CtdlThreadList;
251 CtdlThreadList = this_thread;
252 if (this_thread->next)
253 this_thread->next->prev = this_thread;
254 /* Set up start times */
255 gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */
256 memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */
261 * A function to update a threads load averages
263 void ctdl_thread_internal_update_avgs(struct CtdlThreadNode *this_thread)
265 struct timeval now, result;
266 double last_duration;
268 gettimeofday(&now, NULL);
269 timersub(&now, &(this_thread->last_state_change), &result);
270 pthread_mutex_lock(&this_thread->ThreadMutex);
271 // result now has a timeval for the time we spent in the last state since we last updated
272 last_duration = (double)result.tv_sec + ((double)result.tv_usec / (double) 1000000);
273 if (this_thread->state == CTDL_THREAD_SLEEPING)
274 this_thread->avg_sleeping += last_duration;
275 if (this_thread->state == CTDL_THREAD_RUNNING)
276 this_thread->avg_running += last_duration;
277 if (this_thread->state == CTDL_THREAD_BLOCKED)
278 this_thread->avg_blocked += last_duration;
279 memcpy (&this_thread->last_state_change, &now, sizeof (struct timeval));
280 pthread_mutex_unlock(&this_thread->ThreadMutex);
284 * A function to chenge the state of a thread
286 void ctdl_thread_internal_change_state (struct CtdlThreadNode *this_thread, enum CtdlThreadState new_state)
289 * Wether we change state or not we need update the load values
291 ctdl_thread_internal_update_avgs(this_thread);
292 pthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */
293 if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ))
294 this_thread->state = new_state;
295 if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING))
296 this_thread->state = new_state;
297 if ((new_state == CTDL_THREAD_RUNNING) && ((this_thread->state == CTDL_THREAD_SLEEPING) || (this_thread->state == CTDL_THREAD_BLOCKED)))
298 this_thread->state = new_state;
299 pthread_mutex_unlock(&this_thread->ThreadMutex);
304 * A function to tell all threads to exit
306 void CtdlThreadStopAll(void)
308 //FIXME: The signalling of the condition should not be in the critical_section
309 // We need to build a list of threads we are going to signal and then signal them afterwards
311 struct CtdlThreadNode *this_thread;
313 begin_critical_section(S_THREAD_LIST);
314 this_thread = CtdlThreadList;
317 #ifdef THREADS_USESIGNALS
318 pthread_kill(this_thread->tid, SIGHUP);
320 ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
321 pthread_cond_signal(&this_thread->ThreadCond);
322 pthread_cond_signal(&this_thread->SleepCond);
323 CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (%ld).\n", this_thread->name, this_thread->tid);
324 this_thread = this_thread->next;
326 end_critical_section(S_THREAD_LIST);
331 * A function to wake up all sleeping threads
333 void CtdlThreadWakeAll(void)
335 struct CtdlThreadNode *this_thread;
337 CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
339 begin_critical_section(S_THREAD_LIST);
340 this_thread = CtdlThreadList;
343 if (!this_thread->thread_func)
345 pthread_cond_signal(&this_thread->ThreadCond);
346 pthread_cond_signal(&this_thread->SleepCond);
348 this_thread = this_thread->next;
350 end_critical_section(S_THREAD_LIST);
355 * A function to return the number of threads running in the system
357 int CtdlThreadGetCount(void)
362 int CtdlThreadGetWorkers(void)
367 double CtdlThreadGetWorkerAvg(void)
371 begin_critical_section(S_THREAD_LIST);
372 ret = CtdlThreadWorkerAvg;
373 end_critical_section(S_THREAD_LIST);
377 double CtdlThreadGetLoadAvg(void)
381 begin_critical_section(S_THREAD_LIST);
382 ret = CtdlThreadLoadAvg;
383 end_critical_section(S_THREAD_LIST);
391 * A function to rename a thread
392 * Returns a const char *
394 const char *CtdlThreadName(const char *name)
396 const char *old_name;
400 CtdlLogPrintf(CTDL_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name);
403 // FIXME: do we need this lock? I think not since the pointer asignmaent should be atomic
404 pthread_mutex_lock(&CT->ThreadMutex);
408 pthread_mutex_unlock(&CT->ThreadMutex);
414 * A function to force a thread to exit
416 void CtdlThreadCancel(struct CtdlThreadNode *thread)
418 struct CtdlThreadNode *this_thread;
423 this_thread = thread;
426 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
431 if (!this_thread->thread_func)
433 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
438 ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED);
439 pthread_cancel(this_thread->tid);
445 * A function for a thread to check if it has been asked to stop
447 int CtdlThreadCheckStop(void)
453 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
460 #ifdef THREADS_USERSIGNALS
462 CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal);
464 pthread_mutex_lock(&CT->ThreadMutex);
465 if(state == CTDL_THREAD_STOP_REQ)
467 CT->state = CTDL_THREAD_STOPPING;
468 pthread_mutex_unlock(&CT->ThreadMutex);
471 else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE))
473 pthread_mutex_unlock(&CT->ThreadMutex);
476 pthread_mutex_unlock(&CT->ThreadMutex);
482 * A function to ask a thread to exit
483 * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
485 void CtdlThreadStop(struct CtdlThreadNode *thread)
487 struct CtdlThreadNode *this_thread;
492 this_thread = thread;
495 if (!(this_thread->thread_func))
496 return; // Don't stop garbage collector
497 #ifdef THREADS_USESIGNALS
498 pthread_kill(this_thread->tid, SIGHUP);
500 ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
501 pthread_cond_signal(&this_thread->ThreadCond);
502 pthread_cond_signal(&this_thread->SleepCond);
506 * So we now have a sleep command that works with threads but it is in seconds
508 void CtdlThreadSleep(int secs)
510 struct timespec wake_time;
511 struct timeval time_now;
516 CtdlLogPrintf(CTDL_WARNING, "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
520 memset (&wake_time, 0, sizeof(struct timespec));
521 gettimeofday(&time_now, NULL);
522 wake_time.tv_sec = time_now.tv_sec + secs;
523 wake_time.tv_nsec = time_now.tv_usec * 10;
525 ctdl_thread_internal_change_state (CT, CTDL_THREAD_SLEEPING);
527 pthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
528 pthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, &wake_time);
529 pthread_mutex_unlock(&CT->ThreadMutex);
531 ctdl_thread_internal_change_state (CT, CTDL_THREAD_RUNNING);
536 * Routine to clean up our thread function on exit
538 static void ctdl_internal_thread_cleanup(void *arg)
541 * In here we were called by the current thread because it is exiting
542 * NB. WE ARE THE CURRENT THREAD
544 CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", CT->name, CT->tid);
546 #ifdef HAVE_BACKTRACE
547 eCrash_UnregisterThread();
550 pthread_mutex_lock(&CT->ThreadMutex);
551 CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
552 pthread_mutex_unlock(&CT->ThreadMutex);
556 * A quick function to show the load averages
558 void ctdl_thread_internal_calc_loadavg(void)
560 struct CtdlThreadNode *that_thread;
561 double load_avg, worker_avg;
564 that_thread = CtdlThreadList;
569 /* Update load averages */
570 ctdl_thread_internal_update_avgs(that_thread);
571 pthread_mutex_lock(&that_thread->ThreadMutex);
572 that_thread->load_avg = that_thread->avg_sleeping + that_thread->avg_running + that_thread->avg_blocked;
573 that_thread->load_avg = that_thread->avg_running / that_thread->load_avg * 100;
574 that_thread->avg_sleeping /= 2;
575 that_thread->avg_running /= 2;
576 that_thread->avg_blocked /= 2;
577 load_avg += that_thread->load_avg;
578 if (that_thread->flags & CTDLTHREAD_WORKER)
580 worker_avg += that_thread->load_avg;
583 #ifdef WITH_THREADLOG
584 CtdlLogPrintf(CTDL_DEBUG, "CtdlThread, \"%s\" (%ld) \"%s\" %f %f %f %f.\n",
587 CtdlThreadStates[that_thread->state],
588 that_thread->avg_sleeping,
589 that_thread->avg_running,
590 that_thread->avg_blocked,
591 that_thread->load_avg);
593 pthread_mutex_unlock(&that_thread->ThreadMutex);
594 that_thread = that_thread->next;
596 CtdlThreadLoadAvg = load_avg/num_threads;
597 CtdlThreadWorkerAvg = worker_avg/workers;
598 #ifdef WITH_THREADLOG
599 CtdlLogPrintf(CTDL_INFO, "System load average %f, workers averag %f, threads %d, workers %d, sessions %d\n", CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads, num_workers, num_sessions);
605 * Garbage collection routine.
606 * Gets called by main() in a loop to clean up the thread list periodically.
608 void CtdlThreadGC (void)
610 struct CtdlThreadNode *this_thread, *that_thread;
614 begin_critical_section(S_THREAD_LIST);
616 /* Handle exiting of garbage collector thread */
618 CtdlThreadList->state = CTDL_THREAD_EXITED;
620 #ifdef WITH_THREADLOG
621 CtdlLogPrintf(CTDL_DEBUG, "Thread system running garbage collection.\n");
624 * Woke up to do garbage collection
626 this_thread = CtdlThreadList;
629 that_thread = this_thread;
630 this_thread = this_thread->next;
632 /* Do we need to clean up this thread? */
633 pthread_mutex_lock(&that_thread->ThreadMutex);
634 if (that_thread->state != CTDL_THREAD_EXITED)
636 if(that_thread->flags & CTDLTHREAD_WORKER)
637 workers++; /* Sanity check on number of worker threads */
638 pthread_mutex_unlock(&that_thread->ThreadMutex);
642 if (pthread_equal(that_thread->tid, pthread_self()) && that_thread->thread_func)
644 pthread_mutex_unlock(&that_thread->ThreadMutex);
645 end_critical_section(S_THREAD_LIST);
646 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n");
651 if (num_threads <= 0)
653 pthread_mutex_unlock(&that_thread->ThreadMutex);
654 end_critical_section(S_THREAD_LIST);
655 CtdlLogPrintf(CTDL_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
660 if(that_thread->flags & CTDLTHREAD_WORKER)
661 num_workers--; /* This is a wroker thread so reduce the count. */
663 /* If we are unlinking the list head then the next becomes the list head */
664 if(that_thread->prev)
665 that_thread->prev->next = that_thread->next;
667 CtdlThreadList = that_thread->next;
668 if(that_thread->next)
669 that_thread->next->prev = that_thread->prev;
671 pthread_mutex_unlock(&that_thread->ThreadMutex);
672 pthread_cond_signal(&that_thread->ThreadCond);
673 pthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake
674 pthread_mutex_lock(&that_thread->ThreadMutex); // Make sure it has done what its doing
675 pthread_mutex_unlock(&that_thread->ThreadMutex);
677 * Join on the thread to do clean up and prevent memory leaks
678 * Also makes sure the thread has cleaned up after itself before we remove it from the list
679 * We can join on the garbage collector thread the join should just return EDEADLCK
681 ret = pthread_join (that_thread->tid, NULL);
683 CtdlLogPrintf(CTDL_DEBUG, "Garbage collection on own thread.\n");
684 else if (ret == EINVAL)
685 CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, that thread already joined on.\n");
686 else if (ret == ESRCH)
687 CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, no thread to join on.\n");
689 CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, pthread_join returned an unknown error.\n");
691 * Now we own that thread entry
693 CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (%ld).\n", that_thread->name, that_thread->tid);
694 pthread_mutex_destroy(&that_thread->ThreadMutex);
695 pthread_cond_destroy(&that_thread->ThreadCond);
696 pthread_mutex_destroy(&that_thread->SleepMutex);
697 pthread_cond_destroy(&that_thread->SleepCond);
698 pthread_attr_destroy(&that_thread->attr);
702 /* Sanity check number of worker threads */
703 if (workers != num_workers)
705 end_critical_section(S_THREAD_LIST);
706 CtdlLogPrintf(CTDL_EMERG,
707 "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
712 end_critical_section(S_THREAD_LIST);
719 * Runtime function for a Citadel Thread.
720 * This initialises the threads environment and then calls the user supplied thread function
721 * Note that this is the REAL thread function and wraps the users thread function.
723 static void *ctdl_internal_thread_func (void *arg)
725 struct CtdlThreadNode *this_thread;
728 /* lock and unlock the thread list.
729 * This causes this thread to wait until all its creation stuff has finished before it
730 * can continue its execution.
732 begin_critical_section(S_THREAD_LIST);
733 this_thread = (struct CtdlThreadNode *) arg;
734 gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */
735 pthread_mutex_lock(&this_thread->ThreadMutex);
737 // Register the cleanup function to take care of when we exit.
738 pthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
739 // Get our thread data structure
740 CtdlThreadAllocTSD();
742 this_thread->pid = getpid();
743 memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */
744 /* Only change to running state if we weren't asked to stop during the create cycle
745 * Other wise there is a window to allow this threads creation to continue to full grown and
746 * therby prevent a shutdown of the server.
748 pthread_mutex_unlock(&this_thread->ThreadMutex);
750 if (!CtdlThreadCheckStop())
752 pthread_mutex_lock(&this_thread->ThreadMutex);
753 this_thread->state = CTDL_THREAD_RUNNING;
754 pthread_mutex_unlock(&this_thread->ThreadMutex);
756 end_critical_section(S_THREAD_LIST);
758 // Register for tracing
759 #ifdef HAVE_BACKTRACE
760 eCrash_RegisterThread(this_thread->name, 0);
763 // Tell the world we are here
764 CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", this_thread->name, this_thread->tid);
769 * run the thread to do the work but only if we haven't been asked to stop
771 if (!CtdlThreadCheckStop())
772 ret = (this_thread->thread_func)(this_thread->user_args);
775 * Our thread is exiting either because it wanted to end or because the server is stopping
776 * We need to clean up
778 pthread_cleanup_pop(1); // Execute our cleanup routine and remove it
786 * Internal function to create a thread.
787 * Must be called from within a S_THREAD_LIST critical section
789 struct CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args)
792 struct CtdlThreadNode *this_thread;
794 if (num_threads >= 32767)
796 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
800 this_thread = malloc(sizeof(struct CtdlThreadNode));
801 if (this_thread == NULL) {
802 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
805 // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
806 memset (this_thread, 0, sizeof(struct CtdlThreadNode));
808 /* Create the mutex's early so we can use them */
809 pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
810 pthread_cond_init (&(this_thread->ThreadCond), NULL);
811 pthread_mutex_init (&(this_thread->SleepMutex), NULL);
812 pthread_cond_init (&(this_thread->SleepCond), NULL);
814 pthread_mutex_lock(&this_thread->ThreadMutex);
816 this_thread->state = CTDL_THREAD_CREATE;
818 if ((ret = pthread_attr_init(&this_thread->attr))) {
819 pthread_mutex_unlock(&this_thread->ThreadMutex);
820 pthread_mutex_destroy(&(this_thread->ThreadMutex));
821 pthread_cond_destroy(&(this_thread->ThreadCond));
822 pthread_mutex_destroy(&(this_thread->SleepMutex));
823 pthread_cond_destroy(&(this_thread->SleepCond));
824 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
829 /* Our per-thread stacks need to be bigger than the default size,
830 * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
831 * crashes on 64-bit Linux.
833 if (flags & CTDLTHREAD_BIGSTACK)
835 #ifdef WITH_THREADLOG
836 CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n");
838 if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) {
839 pthread_mutex_unlock(&this_thread->ThreadMutex);
840 pthread_mutex_destroy(&(this_thread->ThreadMutex));
841 pthread_cond_destroy(&(this_thread->ThreadCond));
842 pthread_mutex_destroy(&(this_thread->SleepMutex));
843 pthread_cond_destroy(&(this_thread->SleepCond));
844 pthread_attr_destroy(&this_thread->attr);
845 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n",
853 * If we got here we are going to create the thread so we must initilise the structure
854 * first because most implimentations of threading can't create it in a stopped state
855 * and it might want to do things with its structure that aren't initialised otherwise.
859 this_thread->name = name;
863 this_thread->name = "Un-named Thread";
866 this_thread->flags = flags;
867 this_thread->thread_func = thread_func;
868 this_thread->user_args = args;
869 /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
870 * load average for the system. If we don't do this then we create a mass of threads at the same time
871 * because the creation didn't affect the load average.
873 this_thread->avg_blocked = 2;
876 * We pass this_thread into the thread as its args so that it can find out information
877 * about itself and it has a bit of storage space for itself, not to mention that the REAL
878 * thread function needs to finish off the setup of the structure
880 if ((ret = pthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
883 CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n",
885 pthread_mutex_unlock(&this_thread->ThreadMutex);
886 pthread_mutex_destroy(&(this_thread->ThreadMutex));
887 pthread_cond_destroy(&(this_thread->ThreadCond));
888 pthread_mutex_destroy(&(this_thread->SleepMutex));
889 pthread_cond_destroy(&(this_thread->SleepCond));
890 pthread_attr_destroy(&this_thread->attr);
895 num_threads++; // Increase the count of threads in the system.
896 if(this_thread->flags & CTDLTHREAD_WORKER)
899 this_thread->next = CtdlThreadList;
900 CtdlThreadList = this_thread;
901 if (this_thread->next)
902 this_thread->next->prev = this_thread;
904 pthread_mutex_unlock(&this_thread->ThreadMutex);
906 ctdl_thread_internal_calc_loadavg();
911 * Wrapper function to create a thread
912 * ensures the critical section and other protections are in place.
913 * char *name = name to give to thread, if NULL, use generic name
914 * int flags = flags to determine type of thread and standard facilities
916 struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args)
918 struct CtdlThreadNode *ret = NULL;
920 begin_critical_section(S_THREAD_LIST);
921 ret = ctdl_internal_create_thread(name, flags, thread_func, args);
922 end_critical_section(S_THREAD_LIST);
929 * Internal function to schedule a thread.
930 * Must be called from within a S_THREAD_LIST critical section
932 struct CtdlThreadNode *CtdlThreadSchedule(char *name, long flags, void *(*thread_func) (void *arg), void *args, time_t when)
935 struct CtdlThreadNode *this_thread;
937 if (num_threads >= 32767)
939 CtdlLogPrintf(CTDL_EMERG, "Thread system. Thread list full.\n");
943 this_thread = malloc(sizeof(struct CtdlThreadNode));
944 if (this_thread == NULL) {
945 CtdlLogPrintf(CTDL_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n");
948 // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
949 memset (this_thread, 0, sizeof(struct CtdlThreadNode));
951 /* Create the mutex's early so we can use them */
952 pthread_mutex_init (&(this_thread->ThreadMutex), NULL);
953 pthread_cond_init (&(this_thread->ThreadCond), NULL);
954 pthread_mutex_init (&(this_thread->SleepMutex), NULL);
955 pthread_cond_init (&(this_thread->SleepCond), NULL);
957 this_thread->state = CTDL_THREAD_CREATE;
959 if ((ret = pthread_attr_init(&this_thread->attr))) {
960 pthread_mutex_destroy(&(this_thread->ThreadMutex));
961 pthread_cond_destroy(&(this_thread->ThreadCond));
962 pthread_mutex_destroy(&(this_thread->SleepMutex));
963 pthread_cond_destroy(&(this_thread->SleepCond));
964 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_init: %s\n", strerror(ret));
969 /* Our per-thread stacks need to be bigger than the default size,
970 * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
971 * crashes on 64-bit Linux.
973 if (flags & CTDLTHREAD_BIGSTACK)
975 CtdlLogPrintf(CTDL_INFO, "Thread system. Creating BIG STACK thread.\n");
976 if ((ret = pthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) {
977 pthread_mutex_destroy(&(this_thread->ThreadMutex));
978 pthread_cond_destroy(&(this_thread->ThreadCond));
979 pthread_mutex_destroy(&(this_thread->SleepMutex));
980 pthread_cond_destroy(&(this_thread->SleepCond));
981 pthread_attr_destroy(&this_thread->attr);
982 CtdlLogPrintf(CTDL_EMERG, "Thread system, pthread_attr_setstacksize: %s\n",
990 * If we got here we are going to create the thread so we must initilise the structure
991 * first because most implimentations of threading can't create it in a stopped state
992 * and it might want to do things with its structure that aren't initialised otherwise.
996 this_thread->name = name;
1000 this_thread->name = "Un-named Thread";
1003 this_thread->flags = flags;
1004 this_thread->thread_func = thread_func;
1005 this_thread->user_args = args;
1006 /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
1007 * load average for the system. If we don't do this then we create a mass of threads at the same time
1008 * because the creation didn't affect the load average.
1010 this_thread->avg_blocked = 2;
1013 * When to start this thread
1015 this_thread->when = when;
1017 begin_critical_section(S_SCHEDULE_LIST);
1018 this_thread->next = CtdlThreadSchedList;
1019 CtdlThreadSchedList = this_thread;
1020 if (this_thread->next)
1021 this_thread->next->prev = this_thread;
1022 end_critical_section(S_SCHEDULE_LIST);
1029 struct CtdlThreadNode *ctdl_thread_internal_start_scheduled (struct CtdlThreadNode *this_thread)
1034 * We pass this_thread into the thread as its args so that it can find out information
1035 * about itself and it has a bit of storage space for itself, not to mention that the REAL
1036 * thread function needs to finish off the setup of the structure
1038 if ((ret = pthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0))
1041 CtdlLogPrintf(CTDL_ALERT, "Thread system, Can't create thread: %s\n",
1047 num_threads++; // Increase the count of threads in the system.
1048 if(this_thread->flags & CTDLTHREAD_WORKER)
1051 this_thread->next = CtdlThreadList;
1052 CtdlThreadList = this_thread;
1053 if (this_thread->next)
1054 this_thread->next->prev = this_thread;
1061 void ctdl_thread_internal_check_scheduled(void)
1063 struct CtdlThreadNode *this_thread, *that_thread;
1066 if (try_critical_section(S_SCHEDULE_LIST))
1067 return; /* If this list is locked we wait till the next chance */
1071 #ifdef WITH_THREADLOG
1072 CtdlLogPrintf(CTDL_DEBUG, "Checking for scheduled threads to start.\n");
1075 this_thread = CtdlThreadSchedList;
1078 that_thread = this_thread;
1079 this_thread = this_thread->next;
1081 if (now > that_thread->when)
1083 /* Unlink from schedule list */
1084 if (that_thread->prev)
1085 that_thread->prev->next = that_thread->next;
1087 CtdlThreadSchedList = that_thread->next;
1088 if (that_thread->next)
1089 that_thread->next->prev = that_thread->prev;
1091 that_thread->next = that_thread->prev = NULL;
1092 #ifdef WITH_THREADLOG
1093 CtdlLogPrintf(CTDL_DEBUG, "About to start scheduled thread \"%s\".\n", that_thread->name);
1095 begin_critical_section(S_THREAD_LIST);
1096 if (CT->state > CTDL_THREAD_STOP_REQ)
1097 { /* Only start it if the system is not stopping */
1098 pthread_mutex_lock(&that_thread->ThreadMutex);
1099 if (ctdl_thread_internal_start_scheduled (that_thread) == NULL)
1101 #ifdef WITH_THREADLOG
1102 CtdlLogPrintf(CTDL_DEBUG, "Failed to start scheduled thread \"%s\".\n", that_thread->name);
1104 pthread_mutex_unlock(&that_thread->ThreadMutex);
1105 pthread_mutex_destroy(&(that_thread->ThreadMutex));
1106 pthread_cond_destroy(&(that_thread->ThreadCond));
1107 pthread_mutex_destroy(&(that_thread->SleepMutex));
1108 pthread_cond_destroy(&(that_thread->SleepCond));
1109 pthread_attr_destroy(&that_thread->attr);
1114 CtdlLogPrintf(CTDL_INFO, "Thread system, Started a scheduled thread \"%s\" (%ld).\n",
1115 that_thread->name, that_thread->tid);
1116 pthread_mutex_unlock(&that_thread->ThreadMutex);
1117 ctdl_thread_internal_calc_loadavg();
1120 end_critical_section(S_THREAD_LIST);
1124 #ifdef WITH_THREADLOG
1125 CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n", that_thread->name, that_thread->when - time(NULL));
1129 end_critical_section(S_SCHEDULE_LIST);
1134 * A warapper function for select so we can show a thread as blocked
1136 int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
1140 ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1141 ret = select(n, readfds, writefds, exceptfds, timeout);
1142 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1151 void go_threading(void)
1154 struct CtdlThreadNode *last_worker;
1157 * Initialise the thread system
1159 ctdl_thread_internal_init();
1161 * Now create a bunch of worker threads.
1163 // CtdlLogPrintf(CTDL_DEBUG, "Starting %d worker threads\n", config.c_min_workers);
1164 // begin_critical_section(S_THREAD_LIST);
1165 // i=0; /* Always start at least 1 worker thread */
1168 // ctdl_internal_create_thread("Worker Thread", CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, worker_thread, NULL);
1169 // } while (++i < config.c_min_workers);
1170 // end_critical_section(S_THREAD_LIST);
1172 /* Second call to module init functions now that threading is up */
1173 initialise_modules(1);
1176 * This thread is now used for garbage collection of other threads in the thread list
1178 CtdlLogPrintf(CTDL_INFO, "Startup thread %d becoming garbage collector,\n", pthread_self());
1181 * We do a lot of locking and unlocking of the thread list in here.
1182 * We do this so that we can repeatedly release time for other threads
1183 * that may be waiting on the thread list.
1184 * We are a low priority thread so we can afford to do this
1187 while (CtdlThreadGetCount())
1190 exit_signal = CT->signal;
1192 CtdlThreadStopAll();
1193 check_sched_shutdown();
1194 if (CT->state > CTDL_THREAD_STOP_REQ)
1196 begin_critical_section(S_THREAD_LIST);
1197 ctdl_thread_internal_calc_loadavg();
1198 end_critical_section(S_THREAD_LIST);
1200 ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1203 /* Reduce the size of the worker thread pool if necessary. */
1204 if ((CtdlThreadGetWorkers() > config.c_min_workers) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ))
1206 /* Ask a worker thread to stop as we no longer need it */
1207 begin_critical_section(S_THREAD_LIST);
1208 last_worker = CtdlThreadList;
1211 pthread_mutex_lock(&last_worker->ThreadMutex);
1212 if (last_worker->flags & CTDLTHREAD_WORKER && last_worker->state > CTDL_THREAD_STOPPING)
1214 pthread_mutex_unlock(&last_worker->ThreadMutex);
1217 pthread_mutex_unlock(&last_worker->ThreadMutex);
1218 last_worker = last_worker->next;
1220 end_critical_section(S_THREAD_LIST);
1223 #ifdef WITH_THREADLOG
1224 CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
1229 CtdlThreadStop(last_worker);
1234 * If all our workers are working hard, start some more to help out
1237 /* FIXME: come up with a better way to dynamically alter the number of threads
1238 * based on the system load
1240 // if ((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkers() < num_sessions))
1241 // && (CtdlThreadLoadAvg < 90) )
1242 if ((((CtdlThreadGetWorkers() < config.c_max_workers) && (CtdlThreadGetWorkerAvg() > 60) && (CtdlThreadGetLoadAvg() < 90) ) || CtdlThreadGetWorkers() < config.c_min_workers) && (CT->state > CTDL_THREAD_STOP_REQ))
1244 for (i=0; i<5 ; i++)
1245 // for (i=0; i< (num_sessions - CtdlThreadGetWorkers()) ; i++)
1246 // for (i=0; i< (10 - (55 - CtdlThreadWorkerAvg) / CtdlThreadWorkerAvg / CtdlThreadGetWorkers()) ; i++)
1248 // begin_critical_section(S_THREAD_LIST);
1249 CtdlThreadCreate("Worker Thread",
1250 CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER,
1254 // end_critical_section(S_THREAD_LIST);
1260 if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector
1265 if (CtdlThreadGetCount())
1269 * If the above loop exits we must be shutting down since we obviously have no threads
1271 ctdl_thread_internal_cleanup();