9dee678ffe350ef1e0575f052dc1ead0fa8a07ae
[citadel.git] / citadel / threads.c
1 /*
2  * $Id: sysdep.c 5882 2007-12-13 19:46:05Z davew $
3  *
4  * Citadel "system dependent" stuff.
5  * See copyright.txt for copyright information.
6  *
7  * Here's where we have the Citadel thread implimentation
8  *
9  */
10
11 #include <sys/types.h>
12 #include <errno.h>
13 #include <sys/socket.h>
14 #include <unistd.h>
15 #include <fcntl.h>
16 #include <signal.h>
17
18 #if TIME_WITH_SYS_TIME
19 # include <sys/time.h>
20 # include <time.h>
21 #else
22 # if HAVE_SYS_TIME_H
23 #  include <sys/time.h>
24 # else
25 #  include <time.h>
26 # endif
27 #endif
28
29 #include "threads.h"
30 #include "ctdl_module.h"
31 #include "modules_init.h"
32 #include "housekeeping.h"
33 #include "config.h"
34 #include "citserver.h"
35 #include "sysdep_decls.h"
36
37 /*
38  * define this to use the new worker_thread method of handling connections
39  */
40 //#define NEW_WORKER
41
42 /*
43  * New thread interface.
44  * To create a thread you must call one of the create thread functions.
45  * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
46  * struct CtdlThreadNode *node = NULL;
47  * pass in &node
48  * If the thread is created *node will point to the thread control structure for the created thread.
49  * If the thread creation fails *node remains NULL
50  * Do not free the memory pointed to by *node, it doesn't belong to you.
51  * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
52  * remove the need for the calls to eCrashRegisterThread and friends
53  */
54
55 static int num_threads = 0;     /* Current number of threads */
56 static int num_workers = 0;     /* Current number of worker threads */
57
58 CtdlThreadNode *CtdlThreadList = NULL;
59 CtdlThreadNode *CtdlThreadSchedList = NULL;
60
61 static citthread_t GC_thread;
62 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
63 double CtdlThreadLoadAvg = 0;
64 double CtdlThreadWorkerAvg = 0;
65 citthread_key_t ThreadKey;
66
67 citthread_mutex_t Critters[MAX_SEMAPHORES];     /* Things needing locking */
68
69 int idle_workers = 0;
70 citthread_cond_t worker_block;
71 citthread_mutex_t worker_block_mutex;
72
73
74 void InitialiseSemaphores(void)
75 {
76         int i;
77
78         /* Set up a bunch of semaphores to be used for critical sections */
79         for (i = 0; i < MAX_SEMAPHORES; ++i) {
80                 citthread_mutex_init(&Critters[i], NULL);
81         }
82 }
83
84
85
86
87 /*
88  * Obtain a semaphore lock to begin a critical section.
89  * but only if no one else has one
90  */
91 int try_critical_section(int which_one)
92 {
93         /* For all types of critical sections except those listed here,
94          * ensure nobody ever tries to do a critical section within a
95          * transaction; this could lead to deadlock.
96          */
97         if ((which_one != S_FLOORCACHE)
98 #ifdef DEBUG_MEMORY_LEAKS
99             && (which_one != S_DEBUGMEMLEAKS)
100 #endif
101             && (which_one != S_RPLIST)
102             ) {
103                 cdb_check_handles();
104         }
105         return (citthread_mutex_trylock(&Critters[which_one]));
106 }
107
108
109 /*
110  * Obtain a semaphore lock to begin a critical section.
111  */
112 void begin_critical_section(int which_one)
113 {
114         /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
115
116         /* For all types of critical sections except those listed here,
117          * ensure nobody ever tries to do a critical section within a
118          * transaction; this could lead to deadlock.
119          */
120         if ((which_one != S_FLOORCACHE)
121 #ifdef DEBUG_MEMORY_LEAKS
122             && (which_one != S_DEBUGMEMLEAKS)
123 #endif
124             && (which_one != S_RPLIST)
125             ) {
126                 cdb_check_handles();
127         }
128         citthread_mutex_lock(&Critters[which_one]);
129 }
130
131 /*
132  * Release a semaphore lock to end a critical section.
133  */
134 void end_critical_section(int which_one)
135 {
136         citthread_mutex_unlock(&Critters[which_one]);
137 }
138
139
140 /*
141  * A function to destroy the TSD
142  */
143 static void ctdl_thread_internal_dest_tsd(void *arg)
144 {
145         if (arg != NULL) {
146                 check_handles(arg);
147                 free(arg);
148         }
149 }
150
151
152 /*
153  * A function to initialise the thread TSD
154  */
155 void ctdl_thread_internal_init_tsd(void)
156 {
157         int ret;
158
159         if ((ret =
160              citthread_key_create(&ThreadKey,
161                                   ctdl_thread_internal_dest_tsd))) {
162                 lprintf(CTDL_EMERG, "citthread_key_create: %s\n",
163                         strerror(ret));
164                 exit(CTDLEXIT_DB);
165         }
166         citthread_mutex_init (&worker_block_mutex, NULL);
167         citthread_cond_init(&worker_block, NULL);
168 }
169
170 /*
171  * Ensure that we have a key for thread-specific data. 
172  *
173  * This should be called immediately after startup by any thread 
174  * 
175  */
176 void CtdlThreadAllocTSD(void)
177 {
178         ThreadTSD *tsd;
179
180         if (citthread_getspecific(ThreadKey) != NULL)
181                 return;
182
183         tsd = malloc(sizeof(ThreadTSD));
184
185         tsd->tid = NULL;
186
187         memset(tsd->cursors, 0, sizeof tsd->cursors);
188         tsd->self = NULL;
189
190         citthread_setspecific(ThreadKey, tsd);
191 }
192
193
194 void ctdl_thread_internal_free_tsd(void)
195 {
196         ctdl_thread_internal_dest_tsd(citthread_getspecific(ThreadKey));
197         citthread_setspecific(ThreadKey, NULL);
198 }
199
200
201 void ctdl_thread_internal_cleanup(void)
202 {
203         int i;
204         CtdlThreadNode *this_thread, *that_thread;
205
206         for (i = 0; i < CTDL_THREAD_LAST_STATE; i++) {
207                 free(CtdlThreadStates[i]);
208         }
209
210         /* Clean up the scheduled thread list */
211         this_thread = CtdlThreadSchedList;
212         while (this_thread) {
213                 that_thread = this_thread;
214                 this_thread = this_thread->next;
215                 citthread_mutex_destroy(&that_thread->ThreadMutex);
216                 citthread_cond_destroy(&that_thread->ThreadCond);
217                 citthread_mutex_destroy(&that_thread->SleepMutex);
218                 citthread_cond_destroy(&that_thread->SleepCond);
219                 citthread_attr_destroy(&that_thread->attr);
220                 free(that_thread);
221         }
222         ctdl_thread_internal_free_tsd();
223 }
224
225 void ctdl_thread_internal_init(void)
226 {
227         CtdlThreadNode *this_thread;
228         int ret = 0;
229
230         GC_thread = citthread_self();
231         CtdlThreadStates[CTDL_THREAD_INVALID] = strdup("Invalid Thread");
232         CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
233         CtdlThreadStates[CTDL_THREAD_CREATE] =
234             strdup("Thread being Created");
235         CtdlThreadStates[CTDL_THREAD_CANCELLED] =
236             strdup("Thread Cancelled");
237         CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
238         CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
239         CtdlThreadStates[CTDL_THREAD_STOP_REQ] =
240             strdup("Thread Stop Requested");
241         CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
242         CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
243         CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
244
245         /* Get ourself a thread entry */
246         this_thread = malloc(sizeof(CtdlThreadNode));
247         if (this_thread == NULL) {
248                 CtdlLogPrintf(CTDL_EMERG,
249                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
250                 return;
251         }
252         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
253         memset(this_thread, 0, sizeof(CtdlThreadNode));
254
255         citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
256         citthread_cond_init(&(this_thread->ThreadCond), NULL);
257         citthread_mutex_init(&(this_thread->SleepMutex), NULL);
258         citthread_cond_init(&(this_thread->SleepCond), NULL);
259
260         /* We are garbage collector so create us as running */
261         this_thread->state = CTDL_THREAD_RUNNING;
262
263         if ((ret = citthread_attr_init(&this_thread->attr))) {
264                 CtdlLogPrintf(CTDL_EMERG,
265                               "Thread system, citthread_attr_init: %s\n",
266                               strerror(ret));
267                 free(this_thread);
268                 return;
269         }
270
271         this_thread->name = "Garbage Collection Thread";
272
273         this_thread->tid = GC_thread;
274         CT = this_thread;
275
276         num_threads++;          // Increase the count of threads in the system.
277
278         this_thread->next = CtdlThreadList;
279         CtdlThreadList = this_thread;
280         if (this_thread->next)
281                 this_thread->next->prev = this_thread;
282         /* Set up start times */
283         gettimeofday(&this_thread->start_time, NULL);   /* Time this thread started */
284         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval));      /* Changed state so mark it. */
285 }
286
287
288 /*
289  * A function to update a threads load averages
290  */
291 void ctdl_thread_internal_update_avgs(CtdlThreadNode * this_thread)
292 {
293         struct timeval now, result;
294         double last_duration;
295
296         gettimeofday(&now, NULL);
297         timersub(&now, &(this_thread->last_state_change), &result);
298         /* I don't think these mutex's are needed here */
299         citthread_mutex_lock(&this_thread->ThreadMutex);
300         // result now has a timeval for the time we spent in the last state since we last updated
301         last_duration =
302             (double) result.tv_sec +
303             ((double) result.tv_usec / (double) 1000000);
304         if (this_thread->state == CTDL_THREAD_SLEEPING)
305                 this_thread->avg_sleeping += last_duration;
306         if (this_thread->state == CTDL_THREAD_RUNNING)
307                 this_thread->avg_running += last_duration;
308         if (this_thread->state == CTDL_THREAD_BLOCKED)
309                 this_thread->avg_blocked += last_duration;
310         memcpy(&this_thread->last_state_change, &now,
311                sizeof(struct timeval));
312         citthread_mutex_unlock(&this_thread->ThreadMutex);
313 }
314
315 /*
316  * A function to chenge the state of a thread
317  */
318 void ctdl_thread_internal_change_state(CtdlThreadNode * this_thread,
319                                        enum CtdlThreadState new_state)
320 {
321         /*
322          * Wether we change state or not we need update the load values
323          */
324         ctdl_thread_internal_update_avgs(this_thread);
325         /* This mutex not needed here? */
326         citthread_mutex_lock(&this_thread->ThreadMutex);        /* To prevent race condition of a sleeping thread */
327         if ((new_state == CTDL_THREAD_STOP_REQ)
328             && (this_thread->state > CTDL_THREAD_STOP_REQ))
329                 this_thread->state = new_state;
330         if (((new_state == CTDL_THREAD_SLEEPING)
331              || (new_state == CTDL_THREAD_BLOCKED))
332             && (this_thread->state == CTDL_THREAD_RUNNING))
333                 this_thread->state = new_state;
334         if ((new_state == CTDL_THREAD_RUNNING)
335             && ((this_thread->state == CTDL_THREAD_SLEEPING)
336                 || (this_thread->state == CTDL_THREAD_BLOCKED)))
337                 this_thread->state = new_state;
338         citthread_mutex_unlock(&this_thread->ThreadMutex);
339 }
340
341
342 /*
343  * A function to tell all threads to exit
344  */
345 void CtdlThreadStopAll(void)
346 {
347         //FIXME: The signalling of the condition should not be in the critical_section
348         // We need to build a list of threads we are going to signal and then signal them afterwards
349
350         CtdlThreadNode *this_thread;
351
352         begin_critical_section(S_THREAD_LIST);
353         this_thread = CtdlThreadList;
354         while (this_thread) {
355 #ifdef THREADS_USESIGNALS
356                 citthread_killl(this_thread->tid, SIGHUP);
357 #endif
358                 citthread_kill(this_thread->tid, SIGUSR1);
359                 ctdl_thread_internal_change_state(this_thread,
360                                                   CTDL_THREAD_STOP_REQ);
361                 citthread_cond_signal(&this_thread->ThreadCond);
362                 citthread_cond_signal(&this_thread->SleepCond);
363                 CtdlLogPrintf(CTDL_DEBUG,
364                               "Thread system stopping thread \"%s\" (%ld).\n",
365                               this_thread->name, this_thread->tid);
366                 this_thread = this_thread->next;
367         }
368         end_critical_section(S_THREAD_LIST);
369         citthread_cond_broadcast(&worker_block);
370 }
371
372
373 /*
374  * A function to wake up all sleeping threads
375  */
376 void CtdlThreadWakeAll(void)
377 {
378         CtdlThreadNode *this_thread;
379
380         CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
381
382         begin_critical_section(S_THREAD_LIST);
383         this_thread = CtdlThreadList;
384         while (this_thread) {
385                 if (!this_thread->thread_func) {
386                         citthread_cond_signal(&this_thread->ThreadCond);
387                         citthread_cond_signal(&this_thread->SleepCond);
388                 }
389                 this_thread = this_thread->next;
390         }
391         end_critical_section(S_THREAD_LIST);
392 }
393
394
395 /*
396  * A function to return the number of threads running in the system
397  */
398 int CtdlThreadGetCount(void)
399 {
400         return num_threads;
401 }
402
403 int CtdlThreadGetWorkers(void)
404 {
405         return num_workers;
406 }
407
408 double CtdlThreadGetWorkerAvg(void)
409 {
410         double ret;
411
412         begin_critical_section(S_THREAD_LIST);
413         ret = CtdlThreadWorkerAvg;
414         end_critical_section(S_THREAD_LIST);
415         return ret;
416 }
417
418 double CtdlThreadGetLoadAvg(void)
419 {
420         double ret;
421
422         begin_critical_section(S_THREAD_LIST);
423         ret = CtdlThreadLoadAvg;
424         end_critical_section(S_THREAD_LIST);
425         return ret;
426 }
427
428
429
430
431 /*
432  * A function to rename a thread
433  * Returns a const char *
434  */
435 const char *CtdlThreadName(const char *name)
436 {
437         const char *old_name;
438
439         if (!CT) {
440                 CtdlLogPrintf(CTDL_WARNING,
441                               "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n",
442                               name);
443                 return NULL;
444         }
445         old_name = CT->name;
446         if (name)
447                 CT->name = name;
448         return (old_name);
449 }
450
451
452 /*
453  * A function to force a thread to exit
454  */
455 void CtdlThreadCancel(CtdlThreadNode * thread)
456 {
457         CtdlThreadNode *this_thread;
458
459         if (!thread)
460                 this_thread = CT;
461         else
462                 this_thread = thread;
463         if (!this_thread) {
464                 CtdlLogPrintf(CTDL_EMERG,
465                               "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
466                 CtdlThreadStopAll();
467                 return;
468         }
469
470         if (!this_thread->thread_func) {
471                 CtdlLogPrintf(CTDL_EMERG,
472                               "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
473                 CtdlThreadStopAll();
474                 return;
475         }
476
477         ctdl_thread_internal_change_state(this_thread,
478                                           CTDL_THREAD_CANCELLED);
479         citthread_cancel(this_thread->tid);
480 }
481
482
483 /*
484  * A function for a thread to check if it has been asked to stop
485  */
486 int CtdlThreadCheckStop(void)
487 {
488         int state;
489
490         if (!CT) {
491                 CtdlLogPrintf(CTDL_EMERG,
492                               "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
493                 CtdlThreadStopAll();
494                 return -1;
495         }
496
497         state = CT->state;
498
499 #ifdef THREADS_USERSIGNALS
500         if (CT->signal)
501                 CtdlLogPrintf(CTDL_DEBUG,
502                               "Thread \"%s\" caught signal %d.\n",
503                               CT->name, CT->signal);
504 #endif
505         if (state == CTDL_THREAD_STOP_REQ) {
506                 CT->state = CTDL_THREAD_STOPPING;
507                 return -1;
508         } else if ((state < CTDL_THREAD_STOP_REQ)
509                    && (state > CTDL_THREAD_CREATE)) {
510                 return -1;
511         }
512         return 0;
513 }
514
515
516 /*
517  * A function to ask a thread to exit
518  * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
519  */
520 void CtdlThreadStop(CtdlThreadNode * thread)
521 {
522         CtdlThreadNode *this_thread;
523
524         if (!thread)
525                 this_thread = CT;
526         else
527                 this_thread = thread;
528         if (!this_thread)
529                 return;
530         if (!(this_thread->thread_func))
531                 return;         // Don't stop garbage collector
532 #ifdef THREADS_USESIGNALS
533         citthread_kill(this_thread->tid, SIGHUP);
534 #endif
535         ctdl_thread_internal_change_state(this_thread,
536                                           CTDL_THREAD_STOP_REQ);
537         citthread_cond_signal(&this_thread->ThreadCond);
538         citthread_cond_signal(&this_thread->SleepCond);
539 }
540
541 /*
542  * So we now have a sleep command that works with threads but it is in seconds
543  */
544 void CtdlThreadSleep(int secs)
545 {
546         struct timespec wake_time;
547         struct timeval time_now;
548
549
550         if (!CT) {
551                 CtdlLogPrintf(CTDL_WARNING,
552                               "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
553                 return;
554         }
555
556         memset(&wake_time, 0, sizeof(struct timespec));
557         gettimeofday(&time_now, NULL);
558         wake_time.tv_sec = time_now.tv_sec + secs;
559         wake_time.tv_nsec = time_now.tv_usec * 10;
560
561         ctdl_thread_internal_change_state(CT, CTDL_THREAD_SLEEPING);
562
563         citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
564         citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex,
565                                  &wake_time);
566         citthread_mutex_unlock(&CT->ThreadMutex);
567
568         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
569 }
570
571
572 /*
573  * Routine to clean up our thread function on exit
574  */
575 static void ctdl_internal_thread_cleanup(void *arg)
576 {
577         /*
578          * In here we were called by the current thread because it is exiting
579          * NB. WE ARE THE CURRENT THREAD
580          */
581         CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n",
582                       CT->name, CT->tid);
583
584 #ifdef HAVE_BACKTRACE
585         eCrash_UnregisterThread();
586 #endif
587
588         citthread_mutex_lock(&CT->ThreadMutex);
589         CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
590         citthread_mutex_unlock(&CT->ThreadMutex);
591 }
592
593 /*
594  * A quick function to show the load averages
595  */
596 void ctdl_thread_internal_calc_loadavg(void)
597 {
598         CtdlThreadNode *that_thread;
599         double load_avg, worker_avg;
600         int workers = 0;
601
602         that_thread = CtdlThreadList;
603         load_avg = 0;
604         worker_avg = 0;
605         while (that_thread) {
606                 /* Update load averages */
607                 ctdl_thread_internal_update_avgs(that_thread);
608                 citthread_mutex_lock(&that_thread->ThreadMutex);
609                 that_thread->load_avg =
610                     (that_thread->avg_sleeping +
611                      that_thread->avg_running) /
612                     (that_thread->avg_sleeping + that_thread->avg_running +
613                      that_thread->avg_blocked) * 100;
614                 that_thread->avg_sleeping /= 2;
615                 that_thread->avg_running /= 2;
616                 that_thread->avg_blocked /= 2;
617                 load_avg += that_thread->load_avg;
618                 if (that_thread->flags & CTDLTHREAD_WORKER) {
619                         worker_avg += that_thread->load_avg;
620                         workers++;
621                 }
622 #ifdef WITH_THREADLOG
623                 CtdlLogPrintf(CTDL_DEBUG,
624                               "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
625                               that_thread->name, that_thread->tid,
626                               CtdlThreadStates[that_thread->state],
627                               that_thread->avg_sleeping,
628                               that_thread->avg_running,
629                               that_thread->avg_blocked,
630                               that_thread->load_avg);
631 #endif
632                 citthread_mutex_unlock(&that_thread->ThreadMutex);
633                 that_thread = that_thread->next;
634         }
635         CtdlThreadLoadAvg = load_avg / num_threads;
636         CtdlThreadWorkerAvg = worker_avg / workers;
637 #ifdef WITH_THREADLOG
638         CtdlLogPrintf(CTDL_INFO,
639                       "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n",
640                       CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads,
641                       num_workers, num_sessions);
642 #endif
643 }
644
645
646 /*
647  * Garbage collection routine.
648  * Gets called by main() in a loop to clean up the thread list periodically.
649  */
650 void CtdlThreadGC(void)
651 {
652         CtdlThreadNode *this_thread, *that_thread;
653         int workers = 0, sys_workers;
654         int ret = 0;
655
656         begin_critical_section(S_THREAD_LIST);
657
658         /* Handle exiting of garbage collector thread */
659         if (num_threads == 1)
660                 CtdlThreadList->state = CTDL_THREAD_EXITED;
661
662 #ifdef WITH_THREADLOG
663         CtdlLogPrintf(CTDL_DEBUG,
664                       "Thread system running garbage collection.\n");
665 #endif
666         /*
667          * Woke up to do garbage collection
668          */
669         this_thread = CtdlThreadList;
670         while (this_thread) {
671                 that_thread = this_thread;
672                 this_thread = this_thread->next;
673
674                 /* Do we need to clean up this thread? */
675                 if (that_thread->state != CTDL_THREAD_EXITED) {
676                         if (that_thread->flags & CTDLTHREAD_WORKER)
677                                 workers++;      /* Sanity check on number of worker threads */
678                         continue;
679                 }
680
681                 if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) {  /* Sanity check */
682                         end_critical_section(S_THREAD_LIST);
683                         CtdlLogPrintf(CTDL_EMERG,
684                                       "Thread system PANIC, a thread is trying to clean up after itself.\n");
685                         abort();
686                         return;
687                 }
688
689                 if (num_threads <= 0) { /* Sanity check */
690                         end_critical_section(S_THREAD_LIST);
691                         CtdlLogPrintf(CTDL_EMERG,
692                                       "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
693                         abort();
694                         return;
695                 }
696
697                 if (that_thread->flags & CTDLTHREAD_WORKER)
698                         num_workers--;  /* This is a wroker thread so reduce the count. */
699                 num_threads--;
700                 /* If we are unlinking the list head then the next becomes the list head */
701                 if (that_thread->prev)
702                         that_thread->prev->next = that_thread->next;
703                 else
704                         CtdlThreadList = that_thread->next;
705                 if (that_thread->next)
706                         that_thread->next->prev = that_thread->prev;
707
708                 citthread_cond_signal(&that_thread->ThreadCond);
709                 citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake
710                 citthread_mutex_lock(&that_thread->ThreadMutex);        // Make sure it has done what its doing
711                 citthread_mutex_unlock(&that_thread->ThreadMutex);
712                 /*
713                  * Join on the thread to do clean up and prevent memory leaks
714                  * Also makes sure the thread has cleaned up after itself before we remove it from the list
715                  * We can join on the garbage collector thread the join should just return EDEADLCK
716                  */
717                 ret = citthread_join(that_thread->tid, NULL);
718                 if (ret == EDEADLK)
719                         CtdlLogPrintf(CTDL_DEBUG,
720                                       "Garbage collection on own thread.\n");
721                 else if (ret == EINVAL)
722                         CtdlLogPrintf(CTDL_DEBUG,
723                                       "Garbage collection, that thread already joined on.\n");
724                 else if (ret == ESRCH)
725                         CtdlLogPrintf(CTDL_DEBUG,
726                                       "Garbage collection, no thread to join on.\n");
727                 else if (ret != 0)
728                         CtdlLogPrintf(CTDL_DEBUG,
729                                       "Garbage collection, citthread_join returned an unknown error.\n");
730                 /*
731                  * Now we own that thread entry
732                  */
733                 CtdlLogPrintf(CTDL_INFO,
734                               "Garbage Collection for thread \"%s\" (%ld).\n",
735                               that_thread->name, that_thread->tid);
736                 citthread_mutex_destroy(&that_thread->ThreadMutex);
737                 citthread_cond_destroy(&that_thread->ThreadCond);
738                 citthread_mutex_destroy(&that_thread->SleepMutex);
739                 citthread_cond_destroy(&that_thread->SleepCond);
740                 citthread_attr_destroy(&that_thread->attr);
741                 free(that_thread);
742         }
743         sys_workers = num_workers;
744         end_critical_section(S_THREAD_LIST);
745
746         /* Sanity check number of worker threads */
747         if (workers != sys_workers) {
748                 CtdlLogPrintf(CTDL_EMERG,
749                               "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
750                               workers, sys_workers);
751                 abort();
752         }
753 }
754
755
756
757
758 /*
759  * Runtime function for a Citadel Thread.
760  * This initialises the threads environment and then calls the user supplied thread function
761  * Note that this is the REAL thread function and wraps the users thread function.
762  */
763 static void *ctdl_internal_thread_func(void *arg)
764 {
765         CtdlThreadNode *this_thread;
766         void *ret = NULL;
767
768         /* lock and unlock the thread list.
769          * This causes this thread to wait until all its creation stuff has finished before it
770          * can continue its execution.
771          */
772         begin_critical_section(S_THREAD_LIST);
773         this_thread = (CtdlThreadNode *) arg;
774         gettimeofday(&this_thread->start_time, NULL);   /* Time this thread started */
775 //      citthread_mutex_lock(&this_thread->ThreadMutex);
776
777         // Register the cleanup function to take care of when we exit.
778         citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
779         // Get our thread data structure
780         CtdlThreadAllocTSD();
781         CT = this_thread;
782         this_thread->pid = getpid();
783         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval));      /* Changed state so mark it. */
784         /* Only change to running state if we weren't asked to stop during the create cycle
785          * Other wise there is a window to allow this threads creation to continue to full grown and
786          * therby prevent a shutdown of the server.
787          */
788 //      citthread_mutex_unlock(&this_thread->ThreadMutex);
789
790         if (!CtdlThreadCheckStop()) {
791                 citthread_mutex_lock(&this_thread->ThreadMutex);
792                 this_thread->state = CTDL_THREAD_RUNNING;
793                 citthread_mutex_unlock(&this_thread->ThreadMutex);
794         }
795         end_critical_section(S_THREAD_LIST);
796
797         // Register for tracing
798 #ifdef HAVE_BACKTRACE
799         eCrash_RegisterThread(this_thread->name, 0);
800 #endif
801
802         // Tell the world we are here
803         CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n",
804                       this_thread->name, this_thread->tid);
805
806
807
808         /*
809          * run the thread to do the work but only if we haven't been asked to stop
810          */
811         if (!CtdlThreadCheckStop())
812                 ret = (this_thread->thread_func) (this_thread->user_args);
813
814         /*
815          * Our thread is exiting either because it wanted to end or because the server is stopping
816          * We need to clean up
817          */
818         citthread_cleanup_pop(1);       // Execute our cleanup routine and remove it
819
820         return (ret);
821 }
822
823
824
825
826 /*
827  * Function to initialise an empty thread structure
828  */
829 CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *
830                                                  this_thread, long flags)
831 {
832         int ret = 0;
833
834         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
835         memset(this_thread, 0, sizeof(CtdlThreadNode));
836
837         /* Create the mutex's early so we can use them */
838         citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
839         citthread_cond_init(&(this_thread->ThreadCond), NULL);
840         citthread_mutex_init(&(this_thread->SleepMutex), NULL);
841         citthread_cond_init(&(this_thread->SleepCond), NULL);
842
843         this_thread->state = CTDL_THREAD_CREATE;
844
845         if ((ret = citthread_attr_init(&this_thread->attr))) {
846                 citthread_mutex_unlock(&this_thread->ThreadMutex);
847                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
848                 citthread_cond_destroy(&(this_thread->ThreadCond));
849                 citthread_mutex_destroy(&(this_thread->SleepMutex));
850                 citthread_cond_destroy(&(this_thread->SleepCond));
851                 CtdlLogPrintf(CTDL_EMERG,
852                               "Thread system, citthread_attr_init: %s\n",
853                               strerror(ret));
854                 free(this_thread);
855                 return NULL;
856         }
857
858         /* Our per-thread stacks need to be bigger than the default size,
859          * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
860          * crashes on 64-bit Linux.
861          */
862         if (flags & CTDLTHREAD_BIGSTACK) {
863 #ifdef WITH_THREADLOG
864                 CtdlLogPrintf(CTDL_INFO,
865                               "Thread system. Creating BIG STACK thread.\n");
866 #endif
867                 if ((ret =
868                      citthread_attr_setstacksize(&this_thread->attr,
869                                                  THREADSTACKSIZE))) {
870                         citthread_mutex_unlock(&this_thread->ThreadMutex);
871                         citthread_mutex_destroy(&
872                                                 (this_thread->
873                                                  ThreadMutex));
874                         citthread_cond_destroy(&(this_thread->ThreadCond));
875                         citthread_mutex_destroy(&
876                                                 (this_thread->SleepMutex));
877                         citthread_cond_destroy(&(this_thread->SleepCond));
878                         citthread_attr_destroy(&this_thread->attr);
879                         CtdlLogPrintf(CTDL_EMERG,
880                                       "Thread system, citthread_attr_setstacksize: %s\n",
881                                       strerror(ret));
882                         free(this_thread);
883                         return NULL;
884                 }
885         }
886
887         /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
888          * load average for the system. If we don't do this then we create a mass of threads at the same time 
889          * because the creation didn't affect the load average.
890          */
891         this_thread->avg_blocked = 2;
892
893         return (this_thread);
894 }
895
896
897
898
899 /*
900  * Internal function to create a thread.
901  */
902 CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags,
903                                             void *(*thread_func) (void
904                                                                   *arg),
905                                             void *args)
906 {
907         int ret = 0;
908         CtdlThreadNode *this_thread;
909
910         if (num_threads >= 32767) {
911                 CtdlLogPrintf(CTDL_EMERG,
912                               "Thread system. Thread list full.\n");
913                 return NULL;
914         }
915
916         this_thread = malloc(sizeof(CtdlThreadNode));
917         if (this_thread == NULL) {
918                 CtdlLogPrintf(CTDL_EMERG,
919                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
920                 return NULL;
921         }
922
923         /* Initialise the thread structure */
924         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
925                 free(this_thread);
926                 CtdlLogPrintf(CTDL_EMERG,
927                               "Thread system, can't initialise CtdlThreadNode, exiting\n");
928                 return NULL;
929         }
930         /*
931          * If we got here we are going to create the thread so we must initilise the structure
932          * first because most implimentations of threading can't create it in a stopped state
933          * and it might want to do things with its structure that aren't initialised otherwise.
934          */
935         if (name) {
936                 this_thread->name = name;
937         } else {
938                 this_thread->name = "Un-named Thread";
939         }
940
941         this_thread->flags = flags;
942         this_thread->thread_func = thread_func;
943         this_thread->user_args = args;
944
945 //      citthread_mutex_lock(&this_thread->ThreadMutex);
946
947         begin_critical_section(S_THREAD_LIST);
948         /*
949          * We pass this_thread into the thread as its args so that it can find out information
950          * about itself and it has a bit of storage space for itself, not to mention that the REAL
951          * thread function needs to finish off the setup of the structure
952          */
953         if ((ret =
954              citthread_create(&this_thread->tid, &this_thread->attr,
955                               ctdl_internal_thread_func,
956                               this_thread) != 0)) {
957                 end_critical_section(S_THREAD_LIST);
958                 CtdlLogPrintf(CTDL_ALERT,
959                               "Thread system, Can't create thread: %s\n",
960                               strerror(ret));
961                 citthread_mutex_unlock(&this_thread->ThreadMutex);
962                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
963                 citthread_cond_destroy(&(this_thread->ThreadCond));
964                 citthread_mutex_destroy(&(this_thread->SleepMutex));
965                 citthread_cond_destroy(&(this_thread->SleepCond));
966                 citthread_attr_destroy(&this_thread->attr);
967                 free(this_thread);
968                 return NULL;
969         }
970
971         num_threads++;          // Increase the count of threads in the system.
972         if (this_thread->flags & CTDLTHREAD_WORKER)
973                 num_workers++;
974
975         this_thread->next = CtdlThreadList;
976         CtdlThreadList = this_thread;
977         if (this_thread->next)
978                 this_thread->next->prev = this_thread;
979         ctdl_thread_internal_calc_loadavg();
980
981 //      citthread_mutex_unlock(&this_thread->ThreadMutex);
982         end_critical_section(S_THREAD_LIST);
983
984         return this_thread;
985 }
986
987 /*
988  * Wrapper function to create a thread
989  * ensures the critical section and other protections are in place.
990  * char *name = name to give to thread, if NULL, use generic name
991  * int flags = flags to determine type of thread and standard facilities
992  */
993 CtdlThreadNode *CtdlThreadCreate(char *name, long flags,
994                                  void *(*thread_func) (void *arg),
995                                  void *args)
996 {
997         CtdlThreadNode *ret = NULL;
998
999         ret = ctdl_internal_create_thread(name, flags, thread_func, args);
1000         return ret;
1001 }
1002
1003
1004
1005 /*
1006  * Internal function to schedule a thread.
1007  * Must be called from within a S_THREAD_LIST critical section
1008  */
1009 CtdlThreadNode *CtdlThreadSchedule(char *name, long flags,
1010                                    void *(*thread_func) (void *arg),
1011                                    void *args, time_t when)
1012 {
1013         CtdlThreadNode *this_thread;
1014
1015         if (num_threads >= 32767) {
1016                 CtdlLogPrintf(CTDL_EMERG,
1017                               "Thread system. Thread list full.\n");
1018                 return NULL;
1019         }
1020
1021         this_thread = malloc(sizeof(CtdlThreadNode));
1022         if (this_thread == NULL) {
1023                 CtdlLogPrintf(CTDL_EMERG,
1024                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
1025                 return NULL;
1026         }
1027         /* Initialise the thread structure */
1028         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
1029                 free(this_thread);
1030                 CtdlLogPrintf(CTDL_EMERG,
1031                               "Thread system, can't initialise CtdlThreadNode, exiting\n");
1032                 return NULL;
1033         }
1034
1035         /*
1036          * If we got here we are going to create the thread so we must initilise the structure
1037          * first because most implimentations of threading can't create it in a stopped state
1038          * and it might want to do things with its structure that aren't initialised otherwise.
1039          */
1040         if (name) {
1041                 this_thread->name = name;
1042         } else {
1043                 this_thread->name = "Un-named Thread";
1044         }
1045
1046         this_thread->flags = flags;
1047         this_thread->thread_func = thread_func;
1048         this_thread->user_args = args;
1049
1050         /*
1051          * When to start this thread
1052          */
1053         this_thread->when = when;
1054
1055         begin_critical_section(S_SCHEDULE_LIST);
1056         this_thread->next = CtdlThreadSchedList;
1057         CtdlThreadSchedList = this_thread;
1058         if (this_thread->next)
1059                 this_thread->next->prev = this_thread;
1060         end_critical_section(S_SCHEDULE_LIST);
1061
1062         return this_thread;
1063 }
1064
1065
1066
1067 CtdlThreadNode *ctdl_thread_internal_start_scheduled(CtdlThreadNode *
1068                                                      this_thread)
1069 {
1070         int ret = 0;
1071
1072 //      citthread_mutex_lock(&that_thread->ThreadMutex);
1073         begin_critical_section(S_THREAD_LIST);
1074         /*
1075          * We pass this_thread into the thread as its args so that it can find out information
1076          * about itself and it has a bit of storage space for itself, not to mention that the REAL
1077          * thread function needs to finish off the setup of the structure
1078          */
1079         if ((ret =
1080              citthread_create(&this_thread->tid, &this_thread->attr,
1081                               ctdl_internal_thread_func,
1082                               this_thread) != 0)) {
1083                 end_critical_section(S_THREAD_LIST);
1084                 CtdlLogPrintf(CTDL_DEBUG,
1085                               "Failed to start scheduled thread \"%s\": %s\n",
1086                               this_thread->name, strerror(ret));
1087 //              citthread_mutex_unlock(&this_thread->ThreadMutex);
1088                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
1089                 citthread_cond_destroy(&(this_thread->ThreadCond));
1090                 citthread_mutex_destroy(&(this_thread->SleepMutex));
1091                 citthread_cond_destroy(&(this_thread->SleepCond));
1092                 citthread_attr_destroy(&this_thread->attr);
1093                 free(this_thread);
1094                 return NULL;
1095         }
1096
1097
1098         num_threads++;          // Increase the count of threads in the system.
1099         if (this_thread->flags & CTDLTHREAD_WORKER)
1100                 num_workers++;
1101
1102         this_thread->next = CtdlThreadList;
1103         CtdlThreadList = this_thread;
1104         if (this_thread->next)
1105                 this_thread->next->prev = this_thread;
1106 //      citthread_mutex_unlock(&that_thread->ThreadMutex);
1107
1108         ctdl_thread_internal_calc_loadavg();
1109         end_critical_section(S_THREAD_LIST);
1110
1111
1112         return this_thread;
1113 }
1114
1115
1116
1117 void ctdl_thread_internal_check_scheduled(void)
1118 {
1119         CtdlThreadNode *this_thread, *that_thread;
1120         time_t now;
1121
1122         if (try_critical_section(S_SCHEDULE_LIST))
1123                 return;         /* If this list is locked we wait till the next chance */
1124
1125         now = time(NULL);
1126
1127 #ifdef WITH_THREADLOG
1128         CtdlLogPrintf(CTDL_DEBUG,
1129                       "Checking for scheduled threads to start.\n");
1130 #endif
1131
1132         this_thread = CtdlThreadSchedList;
1133         while (this_thread) {
1134                 that_thread = this_thread;
1135                 this_thread = this_thread->next;
1136
1137                 if (now > that_thread->when) {
1138                         /* Unlink from schedule list */
1139                         if (that_thread->prev)
1140                                 that_thread->prev->next =
1141                                     that_thread->next;
1142                         else
1143                                 CtdlThreadSchedList = that_thread->next;
1144                         if (that_thread->next)
1145                                 that_thread->next->prev =
1146                                     that_thread->prev;
1147
1148                         that_thread->next = that_thread->prev = NULL;
1149 #ifdef WITH_THREADLOG
1150                         CtdlLogPrintf(CTDL_DEBUG,
1151                                       "About to start scheduled thread \"%s\".\n",
1152                                       that_thread->name);
1153 #endif
1154                         if (CT->state > CTDL_THREAD_STOP_REQ) { /* Only start it if the system is not stopping */
1155                                 if (ctdl_thread_internal_start_scheduled
1156                                     (that_thread)) {
1157 #ifdef WITH_THREADLOG
1158                                         CtdlLogPrintf(CTDL_INFO,
1159                                                       "Thread system, Started a scheduled thread \"%s\" (%ud).\n",
1160                                                       that_thread->name,
1161                                                       that_thread->tid);
1162 #endif
1163                                 }
1164                         }
1165                 }
1166 #ifdef WITH_THREADLOG
1167                 else {
1168                         CtdlLogPrintf(CTDL_DEBUG,
1169                                       "Thread \"%s\" will start in %ld seconds.\n",
1170                                       that_thread->name,
1171                                       that_thread->when - time(NULL));
1172                 }
1173 #endif
1174         }
1175         end_critical_section(S_SCHEDULE_LIST);
1176 }
1177
1178
1179 /*
1180  * A warapper function for select so we can show a thread as blocked
1181  */
1182 int CtdlThreadSelect(int n, fd_set * readfds, fd_set * writefds,
1183                      fd_set * exceptfds, struct timeval *timeout)
1184 {
1185         int ret;
1186
1187         ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1188         ret = select(n, readfds, writefds, exceptfds, timeout);
1189         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1190         return ret;
1191 }
1192
1193
1194
1195 void *new_worker_thread(void *arg);
1196 extern void close_masters(void);
1197 void *select_on_master(void *args);
1198 void *select_on_client(void *args);
1199 CtdlThreadNode *client_select_thread;
1200 CtdlThreadNode *master_select_thread;
1201
1202 void go_threading(void)
1203 {
1204         int i;
1205         CtdlThreadNode *last_worker;
1206
1207         /*
1208          * Initialise the thread system
1209          */
1210         ctdl_thread_internal_init();
1211
1212         /* Second call to module init functions now that threading is up */
1213         initialise_modules(1);
1214         
1215         
1216                                 CtdlThreadCreate("House keeping",
1217                                                  CTDLTHREAD_BIGSTACK,
1218                                                  do_housekeeping, NULL);
1219
1220                                                  
1221 #ifdef NEW_WORKER
1222         master_select_thread = CtdlThreadCreate ("Select on Master", 0, select_on_master, NULL);
1223         client_select_thread = CtdlThreadCreate ("Select on client", 0, select_on_client, NULL);
1224 #endif
1225
1226         /*
1227          * This thread is now used for garbage collection of other threads in the thread list
1228          */
1229         CtdlLogPrintf(CTDL_INFO,
1230                       "Startup thread %d becoming garbage collector,\n",
1231                       citthread_self());
1232
1233         /*
1234          * We do a lot of locking and unlocking of the thread list in here.
1235          * We do this so that we can repeatedly release time for other threads
1236          * that may be waiting on the thread list.
1237          * We are a low priority thread so we can afford to do this
1238          */
1239
1240         while (CtdlThreadGetCount()) {
1241                 if (CT->signal)
1242                         exit_signal = CT->signal;
1243                 if (exit_signal) {
1244                         CtdlThreadStopAll();
1245 //                      close_masters();
1246                 }
1247                 check_sched_shutdown();
1248                 if (CT->state > CTDL_THREAD_STOP_REQ) {
1249                         begin_critical_section(S_THREAD_LIST);
1250                         ctdl_thread_internal_calc_loadavg();
1251                         end_critical_section(S_THREAD_LIST);
1252
1253                         ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1254                 }
1255
1256                 /* Reduce the size of the worker thread pool if necessary. */
1257                 if ((CtdlThreadGetWorkers() > config.c_min_workers + 1)
1258                     && (CtdlThreadWorkerAvg < 20)
1259                     && (CT->state > CTDL_THREAD_STOP_REQ)) {
1260                         /* Ask a worker thread to stop as we no longer need it */
1261                         begin_critical_section(S_THREAD_LIST);
1262                         last_worker = CtdlThreadList;
1263                         while (last_worker) {
1264                                 citthread_mutex_lock(&last_worker->
1265                                                      ThreadMutex);
1266                                 if (last_worker->flags & CTDLTHREAD_WORKER
1267                                     && (last_worker->state >
1268                                         CTDL_THREAD_STOPPING)
1269                                     && (last_worker->Context == NULL)) {
1270                                         citthread_mutex_unlock
1271                                             (&last_worker->ThreadMutex);
1272                                         break;
1273                                 }
1274                                 citthread_mutex_unlock(&last_worker->
1275                                                        ThreadMutex);
1276                                 last_worker = last_worker->next;
1277                         }
1278                         end_critical_section(S_THREAD_LIST);
1279                         if (last_worker) {
1280 #ifdef WITH_THREADLOG
1281                                 CtdlLogPrintf(CTDL_DEBUG,
1282                                               "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
1283                                               last_worker->name,
1284                                               last_worker->tid);
1285 #endif
1286                                 CtdlThreadStop(last_worker);
1287                         }
1288                 }
1289
1290                 /*
1291                  * If all our workers are working hard, start some more to help out
1292                  * with things
1293                  */
1294                 /* FIXME: come up with a better way to dynamically alter the number of threads
1295                  * based on the system load
1296                  */
1297 #ifdef NEW_WORKER
1298                 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1299                       && (CtdlThreadGetWorkers() <= num_sessions))
1300                      || CtdlThreadGetWorkers() < config.c_min_workers)
1301                     && (CT->state > CTDL_THREAD_STOP_REQ))
1302 #else
1303                 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1304                       && (CtdlThreadGetWorkerAvg() > 60)
1305                       && (CtdlThreadGetLoadAvg() < 90))
1306                      || CtdlThreadGetWorkers() < config.c_min_workers)
1307                     && (CT->state > CTDL_THREAD_STOP_REQ))
1308 #endif                          /* NEW_WORKER */
1309                 {
1310                         for (i = 0; i < 5; i++) {
1311 #ifdef NEW_WORKER
1312                                 CtdlThreadCreate("Worker Thread (new)",
1313                                                  CTDLTHREAD_BIGSTACK +
1314                                                  CTDLTHREAD_WORKER,
1315                                                  new_worker_thread, NULL);
1316 #else
1317                                 CtdlThreadCreate("Worker Thread",
1318                                                  CTDLTHREAD_BIGSTACK +
1319                                                  CTDLTHREAD_WORKER,
1320                                                  worker_thread, NULL);
1321 #endif                          /* NEW_WORKER */
1322                         }
1323                 }
1324
1325                 CtdlThreadGC();
1326
1327                 if (CtdlThreadGetCount() <= 1)  // Shutting down clean up the garbage collector
1328                 {
1329                         CtdlThreadGC();
1330                 }
1331
1332                 if (CtdlThreadGetCount())
1333                         CtdlThreadSleep(1);
1334         }
1335         /*
1336          * If the above loop exits we must be shutting down since we obviously have no threads
1337          */
1338         ctdl_thread_internal_cleanup();
1339 }
1340
1341
1342
1343
1344
1345 /*
1346  * Starting a new implimentation of a worker thread.
1347  * This new implimentation will be faster and do more work per thread.
1348  */
1349
1350 // TODO: need to sort out the thread states and signals
1351 // TODO: slect_on_master should not be a big stack thread.
1352 // TODO: slect_on_client should not be a big stack thread.
1353 // TODO: select_on_master is not a worker thread and should be blocked when in select
1354 // TODO: select_on_client is not a worker thread and should be blocked when in select
1355 /**
1356  * Select on master socket.
1357  * One specific thread comes in here and never leaves.
1358  * This thread blocks on select until something happens.
1359  * The select only returns if a new connection is made or the select is interrupted by some means.
1360  * We need to interrupt the select if the list of ServiceHook's changes or we are shutting down.
1361  * We should probably use a signal to interrupt the select is a ServiceHook is created.
1362  * When a ServiceHook is destroyed its socket will close which will awaken the select.
1363  */
1364 void *select_on_master(void *arg)
1365 {
1366         fd_set readfds;
1367         struct ServiceFunctionHook *serviceptr;
1368         int ssock;              /* Descriptor for client socket */
1369         int highest;
1370         int m, i;
1371         int retval = 0;
1372         struct CitContext *con;
1373
1374
1375
1376         while (!CtdlThreadCheckStop()) {
1377                 CtdlThreadName("select_on_master");
1378
1379                 /* Initialize the fdset. */
1380                 FD_ZERO(&readfds);
1381                 highest = 0;
1382
1383                 /* First, add the various master sockets to the fdset. */
1384                 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1385                      serviceptr = serviceptr->next) {
1386                         m = serviceptr->msock;
1387                         FD_SET(m, &readfds);
1388                         if (m > highest) {
1389                                 highest = m;
1390                         }
1391                 }
1392
1393         /** We can block indefinately since something will wake us up eventually
1394          * Even if it is a signal telling us the system is shutting down
1395          */
1396                 retval =
1397                     CtdlThreadSelect(highest + 1, &readfds, NULL, NULL,
1398                                      NULL);
1399
1400         /** Select got an error or we are shutting down so get out */
1401                 if (retval == 0 || CtdlThreadCheckStop()) {
1402                         return NULL;
1403                 }
1404
1405         /** Select says something happened on one of our master sockets so now we handle it */
1406                 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1407                      serviceptr = serviceptr->next) {
1408                         if (FD_ISSET(serviceptr->msock, &readfds)) {
1409                                 ssock = accept(serviceptr->msock, NULL, 0);
1410                                 if (ssock >= 0) {
1411                                         CtdlLogPrintf(CTDL_DEBUG,
1412                                                       "New client socket %d\n",
1413                                                       ssock);
1414                                         /* The master socket is non-blocking but the client
1415                                          * sockets need to be blocking, otherwise certain
1416                                          * operations barf on FreeBSD.  Not a fatal error.
1417                                          */
1418                                         if (fcntl(ssock, F_SETFL, 0) < 0) {
1419                                                 CtdlLogPrintf(CTDL_EMERG,
1420                                                               "citserver: Can't set socket to blocking: %s\n",
1421                                                               strerror
1422                                                               (errno));
1423                                         }
1424                                         /* New context will be created already
1425                                          * set up in the CON_EXECUTING state.
1426                                          */
1427                                         con = CreateNewContext();
1428                                         /* Assign our new socket number to it. */
1429                                         con->client_socket = ssock;
1430                                         con->h_command_function =
1431                                             serviceptr->h_command_function;
1432                                         con->h_async_function =
1433                                             serviceptr->h_async_function;
1434                                         con->ServiceName =
1435                                             serviceptr->ServiceName;
1436                                         con->h_greeting_function = serviceptr->h_greeting_function;
1437                                         /* Determine whether it's a local socket */
1438                                         if (serviceptr->sockpath != NULL)
1439                                                 con->is_local_socket = 1;
1440
1441                                         /* Set the SO_REUSEADDR socket option */
1442                                         i = 1;
1443                                         setsockopt(ssock, SOL_SOCKET,
1444                                                    SO_REUSEADDR, &i,
1445                                                    sizeof(i));
1446
1447                         /** Now we can pass this context to an idle worker thread to get things going
1448                          * What if there are no idle workers?
1449                          * We could create one but what if the thread list is full?
1450                          * Then I guess we need to close the socket a reject the connection.
1451                          */
1452                         /** TODO: If there are no idle threads then this server is overloaded and we should reject the connection
1453                          * This will have the effect of throttling the incomming connections on master sockets
1454                          * a little and slow the process down.
1455                          */
1456 //                                      if (idle_workers)
1457                                         {
1458                                                 con->state = CON_START;
1459                                                 citthread_kill(client_select_thread->tid, SIGUSR1);
1460                                                 citthread_cond_signal(&worker_block);
1461                                         }
1462                                         // else
1463                                         // output try later message
1464                                         //start_context(con);
1465                                 }
1466                         }
1467                 }
1468         }
1469         return NULL;
1470 }
1471
1472
1473
1474 /*
1475  * Select on client socket.
1476  * Only one dedicated thread in here.
1477  * We have to interrupt our select whenever a context is returned to the CON_READY state.
1478  * as a result each context may be close to timing out its client so we have to calculate
1479  * which client socket will timeout first and expire our select on that time.
1480  * 
1481  */
1482 void *select_on_client(void *arg)
1483 {
1484         fd_set readfds;
1485         struct timeval tv, now, result;
1486         int retval = 0;
1487         int highest;
1488         struct CitContext *ptr;
1489
1490         CtdlThreadName("select_on_client");
1491
1492         while (!CtdlThreadCheckStop()) {
1493                 /* Initialise the fdset */
1494                 FD_ZERO(&readfds);
1495                 highest = 0;
1496                 /** Get the clients to select on */
1497                 tv.tv_sec = config.c_sleeping;
1498                 tv.tv_usec = 0;
1499                 begin_critical_section(S_SESSION_TABLE);
1500                 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1501                         if (ptr->state == CON_IDLE) {
1502                                 gettimeofday(&now, NULL);
1503                                 timersub(&(ptr->client_expires_at),
1504                                          &now, &result);
1505                                 if (result.tv_sec <= 0) {
1506                                         /** This client has timed out so kill it */
1507                                         ptr->kill_me = 1;
1508                                         continue;
1509                                 }
1510                                 /** Is this one going to expire first? */
1511                                 timersub(&result, &tv, &now);
1512                                 if (now.tv_sec <= 0 && now.tv_usec <= 0) {
1513                                         tv.tv_sec = result.tv_sec;
1514                                         tv.tv_usec = result.tv_usec;
1515                                 }
1516                                 FD_SET(ptr->client_socket, &readfds);
1517                                 if (ptr->client_socket > highest)
1518                                         highest = ptr->client_socket;
1519                         }
1520                 }
1521                 end_critical_section(S_SESSION_TABLE);
1522
1523                 /* Now we can select on any connections that are waiting */
1524                 if (!CtdlThreadCheckStop()) {
1525                         retval =
1526                             CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
1527                 } else {        /* Shutting down? */
1528
1529                         return NULL;
1530                 }
1531
1532
1533                 /* Now figure out who made this select() unblock.
1534                  * First, check for an error or exit condition.
1535                  */
1536                 if (retval < 0) {
1537                         if (errno == EBADF) {
1538                                 CtdlLogPrintf(CTDL_NOTICE,
1539                                               "select() failed: (%s)\n",
1540                                               strerror(errno));
1541                         }
1542                         if (errno != EINTR) {
1543                                 CtdlLogPrintf(CTDL_EMERG,
1544                                               "Exiting (%s)\n",
1545                                               strerror(errno));
1546                                 CtdlThreadStopAll();
1547                         } else if (!CtdlThreadCheckStop()) {
1548                                 CtdlLogPrintf(CTDL_DEBUG,
1549                                               "Un handled select failure.\n");
1550                         }
1551                 } else if (retval > 0) {
1552                         begin_critical_section(S_SESSION_TABLE);
1553                         for (ptr = ContextList; ptr != NULL;
1554                              ptr = ptr->next) {
1555                                 if ((FD_ISSET
1556                                      (ptr->client_socket, &readfds))
1557                                     && (ptr->state == CON_IDLE)) {
1558                                         ptr->input_waiting = 1;
1559                                         ptr->state = CON_READY;
1560                                         /** reset client expire time */
1561                                         ptr->client_expires_at.tv_sec = config.c_sleeping;
1562                                         ptr->client_expires_at.tv_usec = 0;
1563                                 }
1564                         }
1565                         end_critical_section(S_SESSION_TABLE);
1566                 }
1567         }
1568         return NULL;
1569 }
1570
1571
1572
1573 /*
1574  * Do the worker threads work when needed
1575  */
1576 int execute_session(struct CitContext *bind_me)
1577 {
1578         int force_purge;
1579
1580         become_session(bind_me);
1581
1582         /* If the client has sent a command, execute it. */
1583         if (CC->input_waiting) {
1584                 CC->h_command_function();
1585                 CC->input_waiting = 0;
1586         }
1587
1588         /* If there are asynchronous messages waiting and the
1589          * client supports it, do those now */
1590         if ((CC->is_async) && (CC->async_waiting)
1591             && (CC->h_async_function != NULL)) {
1592                 CC->h_async_function();
1593                 CC->async_waiting = 0;
1594         }
1595
1596         force_purge = CC->kill_me;
1597         if (force_purge)
1598                 CT->Context = NULL;
1599         become_session(NULL);
1600         bind_me->state = CON_IDLE;
1601         return force_purge;
1602 }
1603
1604
1605
1606 extern void dead_session_purge(int force);
1607
1608 /*
1609  * A new worker_thread loop.
1610  */
1611
1612 void *new_worker_thread(void *arg)
1613 {
1614         struct CitContext *bind_me, *ptr;
1615         int force_purge;
1616
1617         while (!CtdlThreadCheckStop()) {
1618
1619                 /* make doubly sure we're not holding any stale db handles
1620                  * which might cause a deadlock.
1621                  */
1622                 cdb_check_handles();
1623                 force_purge = 0;
1624                 bind_me = NULL; /* Which session shall we handle? */
1625
1626                 begin_critical_section(S_SESSION_TABLE);
1627                 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1628                         if (ptr->state == CON_START) {
1629                                 ptr->state = CON_EXECUTING;
1630                                 end_critical_section(S_SESSION_TABLE);
1631                                 become_session(ptr);
1632                                 begin_session(ptr);
1633                                 ptr->h_greeting_function();
1634                                 become_session(NULL);
1635                                 ptr->state = CON_IDLE;
1636                                 break;
1637                         }
1638                         if (ptr->state == CON_READY) {
1639                                 ptr->state = CON_EXECUTING;
1640                                 end_critical_section(S_SESSION_TABLE);
1641                                 force_purge = execute_session(ptr);
1642                                 break;
1643                         }
1644
1645                 }
1646                 end_critical_section(S_SESSION_TABLE);
1647
1648                 dead_session_purge(force_purge);
1649                 
1650                 /** block the worker threads waiting for a select to do something */
1651                 idle_workers++;
1652                 ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1653                 citthread_mutex_lock(&worker_block_mutex);
1654                 citthread_cond_wait(&worker_block, &worker_block_mutex);
1655                 citthread_mutex_unlock(&worker_block_mutex);
1656                 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1657                 idle_workers--;
1658                 
1659                 if (CtdlThreadCheckStop())
1660                         break;
1661
1662         }
1663         return NULL;
1664 }