do_hosekeeping is now a seperate thread. This releases worker threads
[citadel.git] / citadel / threads.c
1 /*
2  * $Id: sysdep.c 5882 2007-12-13 19:46:05Z davew $
3  *
4  * Citadel "system dependent" stuff.
5  * See copyright.txt for copyright information.
6  *
7  * Here's where we have the Citadel thread implimentation
8  *
9  */
10
11 #include <errno.h>
12 #include <sys/socket.h>
13 #include <unistd.h>
14 #include <fcntl.h>
15 #include <signal.h>
16
17 #if TIME_WITH_SYS_TIME
18 # include <sys/time.h>
19 # include <time.h>
20 #else
21 # if HAVE_SYS_TIME_H
22 #  include <sys/time.h>
23 # else
24 #  include <time.h>
25 # endif
26 #endif
27
28 #include "threads.h"
29 #include "ctdl_module.h"
30 #include "modules_init.h"
31 #include "housekeeping.h"
32 #include "config.h"
33 #include "citserver.h"
34 #include "sysdep_decls.h"
35
36 /*
37  * define this to use the new worker_thread method of handling connections
38  */
39 //#define NEW_WORKER
40
41 /*
42  * New thread interface.
43  * To create a thread you must call one of the create thread functions.
44  * You must pass it the address of (a pointer to a CtdlThreadNode initialised to NULL) like this
45  * struct CtdlThreadNode *node = NULL;
46  * pass in &node
47  * If the thread is created *node will point to the thread control structure for the created thread.
48  * If the thread creation fails *node remains NULL
49  * Do not free the memory pointed to by *node, it doesn't belong to you.
50  * This new interface duplicates much of the eCrash stuff. We should go for closer integration since that would
51  * remove the need for the calls to eCrashRegisterThread and friends
52  */
53
54 static int num_threads = 0;     /* Current number of threads */
55 static int num_workers = 0;     /* Current number of worker threads */
56
57 CtdlThreadNode *CtdlThreadList = NULL;
58 CtdlThreadNode *CtdlThreadSchedList = NULL;
59
60 static citthread_t GC_thread;
61 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
62 double CtdlThreadLoadAvg = 0;
63 double CtdlThreadWorkerAvg = 0;
64 citthread_key_t ThreadKey;
65
66 citthread_mutex_t Critters[MAX_SEMAPHORES];     /* Things needing locking */
67
68 int idle_workers = 0;
69 citthread_cond_t worker_block;
70 citthread_mutex_t worker_block_mutex;
71
72
73 void InitialiseSemaphores(void)
74 {
75         int i;
76
77         /* Set up a bunch of semaphores to be used for critical sections */
78         for (i = 0; i < MAX_SEMAPHORES; ++i) {
79                 citthread_mutex_init(&Critters[i], NULL);
80         }
81 }
82
83
84
85
86 /*
87  * Obtain a semaphore lock to begin a critical section.
88  * but only if no one else has one
89  */
90 int try_critical_section(int which_one)
91 {
92         /* For all types of critical sections except those listed here,
93          * ensure nobody ever tries to do a critical section within a
94          * transaction; this could lead to deadlock.
95          */
96         if ((which_one != S_FLOORCACHE)
97 #ifdef DEBUG_MEMORY_LEAKS
98             && (which_one != S_DEBUGMEMLEAKS)
99 #endif
100             && (which_one != S_RPLIST)
101             ) {
102                 cdb_check_handles();
103         }
104         return (citthread_mutex_trylock(&Critters[which_one]));
105 }
106
107
108 /*
109  * Obtain a semaphore lock to begin a critical section.
110  */
111 void begin_critical_section(int which_one)
112 {
113         /* CtdlLogPrintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
114
115         /* For all types of critical sections except those listed here,
116          * ensure nobody ever tries to do a critical section within a
117          * transaction; this could lead to deadlock.
118          */
119         if ((which_one != S_FLOORCACHE)
120 #ifdef DEBUG_MEMORY_LEAKS
121             && (which_one != S_DEBUGMEMLEAKS)
122 #endif
123             && (which_one != S_RPLIST)
124             ) {
125                 cdb_check_handles();
126         }
127         citthread_mutex_lock(&Critters[which_one]);
128 }
129
130 /*
131  * Release a semaphore lock to end a critical section.
132  */
133 void end_critical_section(int which_one)
134 {
135         citthread_mutex_unlock(&Critters[which_one]);
136 }
137
138
139 /*
140  * A function to destroy the TSD
141  */
142 static void ctdl_thread_internal_dest_tsd(void *arg)
143 {
144         if (arg != NULL) {
145                 check_handles(arg);
146                 free(arg);
147         }
148 }
149
150
151 /*
152  * A function to initialise the thread TSD
153  */
154 void ctdl_thread_internal_init_tsd(void)
155 {
156         int ret;
157
158         if ((ret =
159              citthread_key_create(&ThreadKey,
160                                   ctdl_thread_internal_dest_tsd))) {
161                 lprintf(CTDL_EMERG, "citthread_key_create: %s\n",
162                         strerror(ret));
163                 exit(CTDLEXIT_DB);
164         }
165         citthread_mutex_init (&worker_block_mutex, NULL);
166         citthread_cond_init(&worker_block, NULL);
167 }
168
169 /*
170  * Ensure that we have a key for thread-specific data. 
171  *
172  * This should be called immediately after startup by any thread 
173  * 
174  */
175 void CtdlThreadAllocTSD(void)
176 {
177         ThreadTSD *tsd;
178
179         if (citthread_getspecific(ThreadKey) != NULL)
180                 return;
181
182         tsd = malloc(sizeof(ThreadTSD));
183
184         tsd->tid = NULL;
185
186         memset(tsd->cursors, 0, sizeof tsd->cursors);
187         tsd->self = NULL;
188
189         citthread_setspecific(ThreadKey, tsd);
190 }
191
192
193 void ctdl_thread_internal_free_tsd(void)
194 {
195         ctdl_thread_internal_dest_tsd(citthread_getspecific(ThreadKey));
196         citthread_setspecific(ThreadKey, NULL);
197 }
198
199
200 void ctdl_thread_internal_cleanup(void)
201 {
202         int i;
203         CtdlThreadNode *this_thread, *that_thread;
204
205         for (i = 0; i < CTDL_THREAD_LAST_STATE; i++) {
206                 free(CtdlThreadStates[i]);
207         }
208
209         /* Clean up the scheduled thread list */
210         this_thread = CtdlThreadSchedList;
211         while (this_thread) {
212                 that_thread = this_thread;
213                 this_thread = this_thread->next;
214                 citthread_mutex_destroy(&that_thread->ThreadMutex);
215                 citthread_cond_destroy(&that_thread->ThreadCond);
216                 citthread_mutex_destroy(&that_thread->SleepMutex);
217                 citthread_cond_destroy(&that_thread->SleepCond);
218                 citthread_attr_destroy(&that_thread->attr);
219                 free(that_thread);
220         }
221         ctdl_thread_internal_free_tsd();
222 }
223
224 void ctdl_thread_internal_init(void)
225 {
226         CtdlThreadNode *this_thread;
227         int ret = 0;
228
229         GC_thread = citthread_self();
230         CtdlThreadStates[CTDL_THREAD_INVALID] = strdup("Invalid Thread");
231         CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
232         CtdlThreadStates[CTDL_THREAD_CREATE] =
233             strdup("Thread being Created");
234         CtdlThreadStates[CTDL_THREAD_CANCELLED] =
235             strdup("Thread Cancelled");
236         CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited");
237         CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping");
238         CtdlThreadStates[CTDL_THREAD_STOP_REQ] =
239             strdup("Thread Stop Requested");
240         CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping");
241         CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running");
242         CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked");
243
244         /* Get ourself a thread entry */
245         this_thread = malloc(sizeof(CtdlThreadNode));
246         if (this_thread == NULL) {
247                 CtdlLogPrintf(CTDL_EMERG,
248                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
249                 return;
250         }
251         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
252         memset(this_thread, 0, sizeof(CtdlThreadNode));
253
254         citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
255         citthread_cond_init(&(this_thread->ThreadCond), NULL);
256         citthread_mutex_init(&(this_thread->SleepMutex), NULL);
257         citthread_cond_init(&(this_thread->SleepCond), NULL);
258
259         /* We are garbage collector so create us as running */
260         this_thread->state = CTDL_THREAD_RUNNING;
261
262         if ((ret = citthread_attr_init(&this_thread->attr))) {
263                 CtdlLogPrintf(CTDL_EMERG,
264                               "Thread system, citthread_attr_init: %s\n",
265                               strerror(ret));
266                 free(this_thread);
267                 return;
268         }
269
270         this_thread->name = "Garbage Collection Thread";
271
272         this_thread->tid = GC_thread;
273         CT = this_thread;
274
275         num_threads++;          // Increase the count of threads in the system.
276
277         this_thread->next = CtdlThreadList;
278         CtdlThreadList = this_thread;
279         if (this_thread->next)
280                 this_thread->next->prev = this_thread;
281         /* Set up start times */
282         gettimeofday(&this_thread->start_time, NULL);   /* Time this thread started */
283         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval));      /* Changed state so mark it. */
284 }
285
286
287 /*
288  * A function to update a threads load averages
289  */
290 void ctdl_thread_internal_update_avgs(CtdlThreadNode * this_thread)
291 {
292         struct timeval now, result;
293         double last_duration;
294
295         gettimeofday(&now, NULL);
296         timersub(&now, &(this_thread->last_state_change), &result);
297         /* I don't think these mutex's are needed here */
298         citthread_mutex_lock(&this_thread->ThreadMutex);
299         // result now has a timeval for the time we spent in the last state since we last updated
300         last_duration =
301             (double) result.tv_sec +
302             ((double) result.tv_usec / (double) 1000000);
303         if (this_thread->state == CTDL_THREAD_SLEEPING)
304                 this_thread->avg_sleeping += last_duration;
305         if (this_thread->state == CTDL_THREAD_RUNNING)
306                 this_thread->avg_running += last_duration;
307         if (this_thread->state == CTDL_THREAD_BLOCKED)
308                 this_thread->avg_blocked += last_duration;
309         memcpy(&this_thread->last_state_change, &now,
310                sizeof(struct timeval));
311         citthread_mutex_unlock(&this_thread->ThreadMutex);
312 }
313
314 /*
315  * A function to chenge the state of a thread
316  */
317 void ctdl_thread_internal_change_state(CtdlThreadNode * this_thread,
318                                        enum CtdlThreadState new_state)
319 {
320         /*
321          * Wether we change state or not we need update the load values
322          */
323         ctdl_thread_internal_update_avgs(this_thread);
324         /* This mutex not needed here? */
325         citthread_mutex_lock(&this_thread->ThreadMutex);        /* To prevent race condition of a sleeping thread */
326         if ((new_state == CTDL_THREAD_STOP_REQ)
327             && (this_thread->state > CTDL_THREAD_STOP_REQ))
328                 this_thread->state = new_state;
329         if (((new_state == CTDL_THREAD_SLEEPING)
330              || (new_state == CTDL_THREAD_BLOCKED))
331             && (this_thread->state == CTDL_THREAD_RUNNING))
332                 this_thread->state = new_state;
333         if ((new_state == CTDL_THREAD_RUNNING)
334             && ((this_thread->state == CTDL_THREAD_SLEEPING)
335                 || (this_thread->state == CTDL_THREAD_BLOCKED)))
336                 this_thread->state = new_state;
337         citthread_mutex_unlock(&this_thread->ThreadMutex);
338 }
339
340
341 /*
342  * A function to tell all threads to exit
343  */
344 void CtdlThreadStopAll(void)
345 {
346         //FIXME: The signalling of the condition should not be in the critical_section
347         // We need to build a list of threads we are going to signal and then signal them afterwards
348
349         CtdlThreadNode *this_thread;
350
351         begin_critical_section(S_THREAD_LIST);
352         this_thread = CtdlThreadList;
353         while (this_thread) {
354 #ifdef THREADS_USESIGNALS
355                 citthread_killl(this_thread->tid, SIGHUP);
356 #endif
357                 citthread_kill(this_thread->tid, SIGUSR1);
358                 ctdl_thread_internal_change_state(this_thread,
359                                                   CTDL_THREAD_STOP_REQ);
360                 citthread_cond_signal(&this_thread->ThreadCond);
361                 citthread_cond_signal(&this_thread->SleepCond);
362                 CtdlLogPrintf(CTDL_DEBUG,
363                               "Thread system stopping thread \"%s\" (%ld).\n",
364                               this_thread->name, this_thread->tid);
365                 this_thread = this_thread->next;
366         }
367         end_critical_section(S_THREAD_LIST);
368         citthread_cond_broadcast(&worker_block);
369 }
370
371
372 /*
373  * A function to wake up all sleeping threads
374  */
375 void CtdlThreadWakeAll(void)
376 {
377         CtdlThreadNode *this_thread;
378
379         CtdlLogPrintf(CTDL_DEBUG, "Thread system waking all threads.\n");
380
381         begin_critical_section(S_THREAD_LIST);
382         this_thread = CtdlThreadList;
383         while (this_thread) {
384                 if (!this_thread->thread_func) {
385                         citthread_cond_signal(&this_thread->ThreadCond);
386                         citthread_cond_signal(&this_thread->SleepCond);
387                 }
388                 this_thread = this_thread->next;
389         }
390         end_critical_section(S_THREAD_LIST);
391 }
392
393
394 /*
395  * A function to return the number of threads running in the system
396  */
397 int CtdlThreadGetCount(void)
398 {
399         return num_threads;
400 }
401
402 int CtdlThreadGetWorkers(void)
403 {
404         return num_workers;
405 }
406
407 double CtdlThreadGetWorkerAvg(void)
408 {
409         double ret;
410
411         begin_critical_section(S_THREAD_LIST);
412         ret = CtdlThreadWorkerAvg;
413         end_critical_section(S_THREAD_LIST);
414         return ret;
415 }
416
417 double CtdlThreadGetLoadAvg(void)
418 {
419         double ret;
420
421         begin_critical_section(S_THREAD_LIST);
422         ret = CtdlThreadLoadAvg;
423         end_critical_section(S_THREAD_LIST);
424         return ret;
425 }
426
427
428
429
430 /*
431  * A function to rename a thread
432  * Returns a const char *
433  */
434 const char *CtdlThreadName(const char *name)
435 {
436         const char *old_name;
437
438         if (!CT) {
439                 CtdlLogPrintf(CTDL_WARNING,
440                               "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n",
441                               name);
442                 return NULL;
443         }
444         old_name = CT->name;
445         if (name)
446                 CT->name = name;
447         return (old_name);
448 }
449
450
451 /*
452  * A function to force a thread to exit
453  */
454 void CtdlThreadCancel(CtdlThreadNode * thread)
455 {
456         CtdlThreadNode *this_thread;
457
458         if (!thread)
459                 this_thread = CT;
460         else
461                 this_thread = thread;
462         if (!this_thread) {
463                 CtdlLogPrintf(CTDL_EMERG,
464                               "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n");
465                 CtdlThreadStopAll();
466                 return;
467         }
468
469         if (!this_thread->thread_func) {
470                 CtdlLogPrintf(CTDL_EMERG,
471                               "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n");
472                 CtdlThreadStopAll();
473                 return;
474         }
475
476         ctdl_thread_internal_change_state(this_thread,
477                                           CTDL_THREAD_CANCELLED);
478         citthread_cancel(this_thread->tid);
479 }
480
481
482 /*
483  * A function for a thread to check if it has been asked to stop
484  */
485 int CtdlThreadCheckStop(void)
486 {
487         int state;
488
489         if (!CT) {
490                 CtdlLogPrintf(CTDL_EMERG,
491                               "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n");
492                 CtdlThreadStopAll();
493                 return -1;
494         }
495
496         state = CT->state;
497
498 #ifdef THREADS_USERSIGNALS
499         if (CT->signal)
500                 CtdlLogPrintf(CTDL_DEBUG,
501                               "Thread \"%s\" caught signal %d.\n",
502                               CT->name, CT->signal);
503 #endif
504         if (state == CTDL_THREAD_STOP_REQ) {
505                 CT->state = CTDL_THREAD_STOPPING;
506                 return -1;
507         } else if ((state < CTDL_THREAD_STOP_REQ)
508                    && (state > CTDL_THREAD_CREATE)) {
509                 return -1;
510         }
511         return 0;
512 }
513
514
515 /*
516  * A function to ask a thread to exit
517  * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit
518  */
519 void CtdlThreadStop(CtdlThreadNode * thread)
520 {
521         CtdlThreadNode *this_thread;
522
523         if (!thread)
524                 this_thread = CT;
525         else
526                 this_thread = thread;
527         if (!this_thread)
528                 return;
529         if (!(this_thread->thread_func))
530                 return;         // Don't stop garbage collector
531 #ifdef THREADS_USESIGNALS
532         citthread_kill(this_thread->tid, SIGHUP);
533 #endif
534         ctdl_thread_internal_change_state(this_thread,
535                                           CTDL_THREAD_STOP_REQ);
536         citthread_cond_signal(&this_thread->ThreadCond);
537         citthread_cond_signal(&this_thread->SleepCond);
538 }
539
540 /*
541  * So we now have a sleep command that works with threads but it is in seconds
542  */
543 void CtdlThreadSleep(int secs)
544 {
545         struct timespec wake_time;
546         struct timeval time_now;
547
548
549         if (!CT) {
550                 CtdlLogPrintf(CTDL_WARNING,
551                               "CtdlThreadSleep() called by something that is not a thread. Should we die?\n");
552                 return;
553         }
554
555         memset(&wake_time, 0, sizeof(struct timespec));
556         gettimeofday(&time_now, NULL);
557         wake_time.tv_sec = time_now.tv_sec + secs;
558         wake_time.tv_nsec = time_now.tv_usec * 10;
559
560         ctdl_thread_internal_change_state(CT, CTDL_THREAD_SLEEPING);
561
562         citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */
563         citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex,
564                                  &wake_time);
565         citthread_mutex_unlock(&CT->ThreadMutex);
566
567         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
568 }
569
570
571 /*
572  * Routine to clean up our thread function on exit
573  */
574 static void ctdl_internal_thread_cleanup(void *arg)
575 {
576         /*
577          * In here we were called by the current thread because it is exiting
578          * NB. WE ARE THE CURRENT THREAD
579          */
580         CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n",
581                       CT->name, CT->tid);
582
583 #ifdef HAVE_BACKTRACE
584         eCrash_UnregisterThread();
585 #endif
586
587         citthread_mutex_lock(&CT->ThreadMutex);
588         CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early
589         citthread_mutex_unlock(&CT->ThreadMutex);
590 }
591
592 /*
593  * A quick function to show the load averages
594  */
595 void ctdl_thread_internal_calc_loadavg(void)
596 {
597         CtdlThreadNode *that_thread;
598         double load_avg, worker_avg;
599         int workers = 0;
600
601         that_thread = CtdlThreadList;
602         load_avg = 0;
603         worker_avg = 0;
604         while (that_thread) {
605                 /* Update load averages */
606                 ctdl_thread_internal_update_avgs(that_thread);
607                 citthread_mutex_lock(&that_thread->ThreadMutex);
608                 that_thread->load_avg =
609                     (that_thread->avg_sleeping +
610                      that_thread->avg_running) /
611                     (that_thread->avg_sleeping + that_thread->avg_running +
612                      that_thread->avg_blocked) * 100;
613                 that_thread->avg_sleeping /= 2;
614                 that_thread->avg_running /= 2;
615                 that_thread->avg_blocked /= 2;
616                 load_avg += that_thread->load_avg;
617                 if (that_thread->flags & CTDLTHREAD_WORKER) {
618                         worker_avg += that_thread->load_avg;
619                         workers++;
620                 }
621 #ifdef WITH_THREADLOG
622                 CtdlLogPrintf(CTDL_DEBUG,
623                               "CtdlThread, \"%s\" (%lu) \"%s\" %.2f %.2f %.2f %.2f\n",
624                               that_thread->name, that_thread->tid,
625                               CtdlThreadStates[that_thread->state],
626                               that_thread->avg_sleeping,
627                               that_thread->avg_running,
628                               that_thread->avg_blocked,
629                               that_thread->load_avg);
630 #endif
631                 citthread_mutex_unlock(&that_thread->ThreadMutex);
632                 that_thread = that_thread->next;
633         }
634         CtdlThreadLoadAvg = load_avg / num_threads;
635         CtdlThreadWorkerAvg = worker_avg / workers;
636 #ifdef WITH_THREADLOG
637         CtdlLogPrintf(CTDL_INFO,
638                       "System load average %.2f, workers averag %.2f, threads %d, workers %d, sessions %d\n",
639                       CtdlThreadLoadAvg, CtdlThreadWorkerAvg, num_threads,
640                       num_workers, num_sessions);
641 #endif
642 }
643
644
645 /*
646  * Garbage collection routine.
647  * Gets called by main() in a loop to clean up the thread list periodically.
648  */
649 void CtdlThreadGC(void)
650 {
651         CtdlThreadNode *this_thread, *that_thread;
652         int workers = 0, sys_workers;
653         int ret = 0;
654
655         begin_critical_section(S_THREAD_LIST);
656
657         /* Handle exiting of garbage collector thread */
658         if (num_threads == 1)
659                 CtdlThreadList->state = CTDL_THREAD_EXITED;
660
661 #ifdef WITH_THREADLOG
662         CtdlLogPrintf(CTDL_DEBUG,
663                       "Thread system running garbage collection.\n");
664 #endif
665         /*
666          * Woke up to do garbage collection
667          */
668         this_thread = CtdlThreadList;
669         while (this_thread) {
670                 that_thread = this_thread;
671                 this_thread = this_thread->next;
672
673                 /* Do we need to clean up this thread? */
674                 if (that_thread->state != CTDL_THREAD_EXITED) {
675                         if (that_thread->flags & CTDLTHREAD_WORKER)
676                                 workers++;      /* Sanity check on number of worker threads */
677                         continue;
678                 }
679
680                 if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) {  /* Sanity check */
681                         end_critical_section(S_THREAD_LIST);
682                         CtdlLogPrintf(CTDL_EMERG,
683                                       "Thread system PANIC, a thread is trying to clean up after itself.\n");
684                         abort();
685                         return;
686                 }
687
688                 if (num_threads <= 0) { /* Sanity check */
689                         end_critical_section(S_THREAD_LIST);
690                         CtdlLogPrintf(CTDL_EMERG,
691                                       "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n");
692                         abort();
693                         return;
694                 }
695
696                 if (that_thread->flags & CTDLTHREAD_WORKER)
697                         num_workers--;  /* This is a wroker thread so reduce the count. */
698                 num_threads--;
699                 /* If we are unlinking the list head then the next becomes the list head */
700                 if (that_thread->prev)
701                         that_thread->prev->next = that_thread->next;
702                 else
703                         CtdlThreadList = that_thread->next;
704                 if (that_thread->next)
705                         that_thread->next->prev = that_thread->prev;
706
707                 citthread_cond_signal(&that_thread->ThreadCond);
708                 citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake
709                 citthread_mutex_lock(&that_thread->ThreadMutex);        // Make sure it has done what its doing
710                 citthread_mutex_unlock(&that_thread->ThreadMutex);
711                 /*
712                  * Join on the thread to do clean up and prevent memory leaks
713                  * Also makes sure the thread has cleaned up after itself before we remove it from the list
714                  * We can join on the garbage collector thread the join should just return EDEADLCK
715                  */
716                 ret = citthread_join(that_thread->tid, NULL);
717                 if (ret == EDEADLK)
718                         CtdlLogPrintf(CTDL_DEBUG,
719                                       "Garbage collection on own thread.\n");
720                 else if (ret == EINVAL)
721                         CtdlLogPrintf(CTDL_DEBUG,
722                                       "Garbage collection, that thread already joined on.\n");
723                 else if (ret == ESRCH)
724                         CtdlLogPrintf(CTDL_DEBUG,
725                                       "Garbage collection, no thread to join on.\n");
726                 else if (ret != 0)
727                         CtdlLogPrintf(CTDL_DEBUG,
728                                       "Garbage collection, citthread_join returned an unknown error.\n");
729                 /*
730                  * Now we own that thread entry
731                  */
732                 CtdlLogPrintf(CTDL_INFO,
733                               "Garbage Collection for thread \"%s\" (%ld).\n",
734                               that_thread->name, that_thread->tid);
735                 citthread_mutex_destroy(&that_thread->ThreadMutex);
736                 citthread_cond_destroy(&that_thread->ThreadCond);
737                 citthread_mutex_destroy(&that_thread->SleepMutex);
738                 citthread_cond_destroy(&that_thread->SleepCond);
739                 citthread_attr_destroy(&that_thread->attr);
740                 free(that_thread);
741         }
742         sys_workers = num_workers;
743         end_critical_section(S_THREAD_LIST);
744
745         /* Sanity check number of worker threads */
746         if (workers != sys_workers) {
747                 CtdlLogPrintf(CTDL_EMERG,
748                               "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n",
749                               workers, sys_workers);
750                 abort();
751         }
752 }
753
754
755
756
757 /*
758  * Runtime function for a Citadel Thread.
759  * This initialises the threads environment and then calls the user supplied thread function
760  * Note that this is the REAL thread function and wraps the users thread function.
761  */
762 static void *ctdl_internal_thread_func(void *arg)
763 {
764         CtdlThreadNode *this_thread;
765         void *ret = NULL;
766
767         /* lock and unlock the thread list.
768          * This causes this thread to wait until all its creation stuff has finished before it
769          * can continue its execution.
770          */
771         begin_critical_section(S_THREAD_LIST);
772         this_thread = (CtdlThreadNode *) arg;
773         gettimeofday(&this_thread->start_time, NULL);   /* Time this thread started */
774 //      citthread_mutex_lock(&this_thread->ThreadMutex);
775
776         // Register the cleanup function to take care of when we exit.
777         citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL);
778         // Get our thread data structure
779         CtdlThreadAllocTSD();
780         CT = this_thread;
781         this_thread->pid = getpid();
782         memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof(struct timeval));      /* Changed state so mark it. */
783         /* Only change to running state if we weren't asked to stop during the create cycle
784          * Other wise there is a window to allow this threads creation to continue to full grown and
785          * therby prevent a shutdown of the server.
786          */
787 //      citthread_mutex_unlock(&this_thread->ThreadMutex);
788
789         if (!CtdlThreadCheckStop()) {
790                 citthread_mutex_lock(&this_thread->ThreadMutex);
791                 this_thread->state = CTDL_THREAD_RUNNING;
792                 citthread_mutex_unlock(&this_thread->ThreadMutex);
793         }
794         end_critical_section(S_THREAD_LIST);
795
796         // Register for tracing
797 #ifdef HAVE_BACKTRACE
798         eCrash_RegisterThread(this_thread->name, 0);
799 #endif
800
801         // Tell the world we are here
802         CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n",
803                       this_thread->name, this_thread->tid);
804
805
806
807         /*
808          * run the thread to do the work but only if we haven't been asked to stop
809          */
810         if (!CtdlThreadCheckStop())
811                 ret = (this_thread->thread_func) (this_thread->user_args);
812
813         /*
814          * Our thread is exiting either because it wanted to end or because the server is stopping
815          * We need to clean up
816          */
817         citthread_cleanup_pop(1);       // Execute our cleanup routine and remove it
818
819         return (ret);
820 }
821
822
823
824
825 /*
826  * Function to initialise an empty thread structure
827  */
828 CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *
829                                                  this_thread, long flags)
830 {
831         int ret = 0;
832
833         // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready.
834         memset(this_thread, 0, sizeof(CtdlThreadNode));
835
836         /* Create the mutex's early so we can use them */
837         citthread_mutex_init(&(this_thread->ThreadMutex), NULL);
838         citthread_cond_init(&(this_thread->ThreadCond), NULL);
839         citthread_mutex_init(&(this_thread->SleepMutex), NULL);
840         citthread_cond_init(&(this_thread->SleepCond), NULL);
841
842         this_thread->state = CTDL_THREAD_CREATE;
843
844         if ((ret = citthread_attr_init(&this_thread->attr))) {
845                 citthread_mutex_unlock(&this_thread->ThreadMutex);
846                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
847                 citthread_cond_destroy(&(this_thread->ThreadCond));
848                 citthread_mutex_destroy(&(this_thread->SleepMutex));
849                 citthread_cond_destroy(&(this_thread->SleepCond));
850                 CtdlLogPrintf(CTDL_EMERG,
851                               "Thread system, citthread_attr_init: %s\n",
852                               strerror(ret));
853                 free(this_thread);
854                 return NULL;
855         }
856
857         /* Our per-thread stacks need to be bigger than the default size,
858          * otherwise the MIME parser crashes on FreeBSD, and the IMAP service
859          * crashes on 64-bit Linux.
860          */
861         if (flags & CTDLTHREAD_BIGSTACK) {
862 #ifdef WITH_THREADLOG
863                 CtdlLogPrintf(CTDL_INFO,
864                               "Thread system. Creating BIG STACK thread.\n");
865 #endif
866                 if ((ret =
867                      citthread_attr_setstacksize(&this_thread->attr,
868                                                  THREADSTACKSIZE))) {
869                         citthread_mutex_unlock(&this_thread->ThreadMutex);
870                         citthread_mutex_destroy(&
871                                                 (this_thread->
872                                                  ThreadMutex));
873                         citthread_cond_destroy(&(this_thread->ThreadCond));
874                         citthread_mutex_destroy(&
875                                                 (this_thread->SleepMutex));
876                         citthread_cond_destroy(&(this_thread->SleepCond));
877                         citthread_attr_destroy(&this_thread->attr);
878                         CtdlLogPrintf(CTDL_EMERG,
879                                       "Thread system, citthread_attr_setstacksize: %s\n",
880                                       strerror(ret));
881                         free(this_thread);
882                         return NULL;
883                 }
884         }
885
886         /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the
887          * load average for the system. If we don't do this then we create a mass of threads at the same time 
888          * because the creation didn't affect the load average.
889          */
890         this_thread->avg_blocked = 2;
891
892         return (this_thread);
893 }
894
895
896
897
898 /*
899  * Internal function to create a thread.
900  */
901 CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags,
902                                             void *(*thread_func) (void
903                                                                   *arg),
904                                             void *args)
905 {
906         int ret = 0;
907         CtdlThreadNode *this_thread;
908
909         if (num_threads >= 32767) {
910                 CtdlLogPrintf(CTDL_EMERG,
911                               "Thread system. Thread list full.\n");
912                 return NULL;
913         }
914
915         this_thread = malloc(sizeof(CtdlThreadNode));
916         if (this_thread == NULL) {
917                 CtdlLogPrintf(CTDL_EMERG,
918                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
919                 return NULL;
920         }
921
922         /* Initialise the thread structure */
923         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
924                 free(this_thread);
925                 CtdlLogPrintf(CTDL_EMERG,
926                               "Thread system, can't initialise CtdlThreadNode, exiting\n");
927                 return NULL;
928         }
929         /*
930          * If we got here we are going to create the thread so we must initilise the structure
931          * first because most implimentations of threading can't create it in a stopped state
932          * and it might want to do things with its structure that aren't initialised otherwise.
933          */
934         if (name) {
935                 this_thread->name = name;
936         } else {
937                 this_thread->name = "Un-named Thread";
938         }
939
940         this_thread->flags = flags;
941         this_thread->thread_func = thread_func;
942         this_thread->user_args = args;
943
944 //      citthread_mutex_lock(&this_thread->ThreadMutex);
945
946         begin_critical_section(S_THREAD_LIST);
947         /*
948          * We pass this_thread into the thread as its args so that it can find out information
949          * about itself and it has a bit of storage space for itself, not to mention that the REAL
950          * thread function needs to finish off the setup of the structure
951          */
952         if ((ret =
953              citthread_create(&this_thread->tid, &this_thread->attr,
954                               ctdl_internal_thread_func,
955                               this_thread) != 0)) {
956                 end_critical_section(S_THREAD_LIST);
957                 CtdlLogPrintf(CTDL_ALERT,
958                               "Thread system, Can't create thread: %s\n",
959                               strerror(ret));
960                 citthread_mutex_unlock(&this_thread->ThreadMutex);
961                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
962                 citthread_cond_destroy(&(this_thread->ThreadCond));
963                 citthread_mutex_destroy(&(this_thread->SleepMutex));
964                 citthread_cond_destroy(&(this_thread->SleepCond));
965                 citthread_attr_destroy(&this_thread->attr);
966                 free(this_thread);
967                 return NULL;
968         }
969
970         num_threads++;          // Increase the count of threads in the system.
971         if (this_thread->flags & CTDLTHREAD_WORKER)
972                 num_workers++;
973
974         this_thread->next = CtdlThreadList;
975         CtdlThreadList = this_thread;
976         if (this_thread->next)
977                 this_thread->next->prev = this_thread;
978         ctdl_thread_internal_calc_loadavg();
979
980 //      citthread_mutex_unlock(&this_thread->ThreadMutex);
981         end_critical_section(S_THREAD_LIST);
982
983         return this_thread;
984 }
985
986 /*
987  * Wrapper function to create a thread
988  * ensures the critical section and other protections are in place.
989  * char *name = name to give to thread, if NULL, use generic name
990  * int flags = flags to determine type of thread and standard facilities
991  */
992 CtdlThreadNode *CtdlThreadCreate(char *name, long flags,
993                                  void *(*thread_func) (void *arg),
994                                  void *args)
995 {
996         CtdlThreadNode *ret = NULL;
997
998         ret = ctdl_internal_create_thread(name, flags, thread_func, args);
999         return ret;
1000 }
1001
1002
1003
1004 /*
1005  * Internal function to schedule a thread.
1006  * Must be called from within a S_THREAD_LIST critical section
1007  */
1008 CtdlThreadNode *CtdlThreadSchedule(char *name, long flags,
1009                                    void *(*thread_func) (void *arg),
1010                                    void *args, time_t when)
1011 {
1012         CtdlThreadNode *this_thread;
1013
1014         if (num_threads >= 32767) {
1015                 CtdlLogPrintf(CTDL_EMERG,
1016                               "Thread system. Thread list full.\n");
1017                 return NULL;
1018         }
1019
1020         this_thread = malloc(sizeof(CtdlThreadNode));
1021         if (this_thread == NULL) {
1022                 CtdlLogPrintf(CTDL_EMERG,
1023                               "Thread system, can't allocate CtdlThreadNode, exiting\n");
1024                 return NULL;
1025         }
1026         /* Initialise the thread structure */
1027         if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) {
1028                 free(this_thread);
1029                 CtdlLogPrintf(CTDL_EMERG,
1030                               "Thread system, can't initialise CtdlThreadNode, exiting\n");
1031                 return NULL;
1032         }
1033
1034         /*
1035          * If we got here we are going to create the thread so we must initilise the structure
1036          * first because most implimentations of threading can't create it in a stopped state
1037          * and it might want to do things with its structure that aren't initialised otherwise.
1038          */
1039         if (name) {
1040                 this_thread->name = name;
1041         } else {
1042                 this_thread->name = "Un-named Thread";
1043         }
1044
1045         this_thread->flags = flags;
1046         this_thread->thread_func = thread_func;
1047         this_thread->user_args = args;
1048
1049         /*
1050          * When to start this thread
1051          */
1052         this_thread->when = when;
1053
1054         begin_critical_section(S_SCHEDULE_LIST);
1055         this_thread->next = CtdlThreadSchedList;
1056         CtdlThreadSchedList = this_thread;
1057         if (this_thread->next)
1058                 this_thread->next->prev = this_thread;
1059         end_critical_section(S_SCHEDULE_LIST);
1060
1061         return this_thread;
1062 }
1063
1064
1065
1066 CtdlThreadNode *ctdl_thread_internal_start_scheduled(CtdlThreadNode *
1067                                                      this_thread)
1068 {
1069         int ret = 0;
1070
1071 //      citthread_mutex_lock(&that_thread->ThreadMutex);
1072         begin_critical_section(S_THREAD_LIST);
1073         /*
1074          * We pass this_thread into the thread as its args so that it can find out information
1075          * about itself and it has a bit of storage space for itself, not to mention that the REAL
1076          * thread function needs to finish off the setup of the structure
1077          */
1078         if ((ret =
1079              citthread_create(&this_thread->tid, &this_thread->attr,
1080                               ctdl_internal_thread_func,
1081                               this_thread) != 0)) {
1082                 end_critical_section(S_THREAD_LIST);
1083                 CtdlLogPrintf(CTDL_DEBUG,
1084                               "Failed to start scheduled thread \"%s\": %s\n",
1085                               this_thread->name, strerror(ret));
1086 //              citthread_mutex_unlock(&this_thread->ThreadMutex);
1087                 citthread_mutex_destroy(&(this_thread->ThreadMutex));
1088                 citthread_cond_destroy(&(this_thread->ThreadCond));
1089                 citthread_mutex_destroy(&(this_thread->SleepMutex));
1090                 citthread_cond_destroy(&(this_thread->SleepCond));
1091                 citthread_attr_destroy(&this_thread->attr);
1092                 free(this_thread);
1093                 return NULL;
1094         }
1095
1096
1097         num_threads++;          // Increase the count of threads in the system.
1098         if (this_thread->flags & CTDLTHREAD_WORKER)
1099                 num_workers++;
1100
1101         this_thread->next = CtdlThreadList;
1102         CtdlThreadList = this_thread;
1103         if (this_thread->next)
1104                 this_thread->next->prev = this_thread;
1105 //      citthread_mutex_unlock(&that_thread->ThreadMutex);
1106
1107         ctdl_thread_internal_calc_loadavg();
1108         end_critical_section(S_THREAD_LIST);
1109
1110
1111         return this_thread;
1112 }
1113
1114
1115
1116 void ctdl_thread_internal_check_scheduled(void)
1117 {
1118         CtdlThreadNode *this_thread, *that_thread;
1119         time_t now;
1120
1121         if (try_critical_section(S_SCHEDULE_LIST))
1122                 return;         /* If this list is locked we wait till the next chance */
1123
1124         now = time(NULL);
1125
1126 #ifdef WITH_THREADLOG
1127         CtdlLogPrintf(CTDL_DEBUG,
1128                       "Checking for scheduled threads to start.\n");
1129 #endif
1130
1131         this_thread = CtdlThreadSchedList;
1132         while (this_thread) {
1133                 that_thread = this_thread;
1134                 this_thread = this_thread->next;
1135
1136                 if (now > that_thread->when) {
1137                         /* Unlink from schedule list */
1138                         if (that_thread->prev)
1139                                 that_thread->prev->next =
1140                                     that_thread->next;
1141                         else
1142                                 CtdlThreadSchedList = that_thread->next;
1143                         if (that_thread->next)
1144                                 that_thread->next->prev =
1145                                     that_thread->prev;
1146
1147                         that_thread->next = that_thread->prev = NULL;
1148 #ifdef WITH_THREADLOG
1149                         CtdlLogPrintf(CTDL_DEBUG,
1150                                       "About to start scheduled thread \"%s\".\n",
1151                                       that_thread->name);
1152 #endif
1153                         if (CT->state > CTDL_THREAD_STOP_REQ) { /* Only start it if the system is not stopping */
1154                                 if (ctdl_thread_internal_start_scheduled
1155                                     (that_thread)) {
1156 #ifdef WITH_THREADLOG
1157                                         CtdlLogPrintf(CTDL_INFO,
1158                                                       "Thread system, Started a scheduled thread \"%s\" (%ud).\n",
1159                                                       that_thread->name,
1160                                                       that_thread->tid);
1161 #endif
1162                                 }
1163                         }
1164                 }
1165 #ifdef WITH_THREADLOG
1166                 else {
1167                         CtdlLogPrintf(CTDL_DEBUG,
1168                                       "Thread \"%s\" will start in %ld seconds.\n",
1169                                       that_thread->name,
1170                                       that_thread->when - time(NULL));
1171                 }
1172 #endif
1173         }
1174         end_critical_section(S_SCHEDULE_LIST);
1175 }
1176
1177
1178 /*
1179  * A warapper function for select so we can show a thread as blocked
1180  */
1181 int CtdlThreadSelect(int n, fd_set * readfds, fd_set * writefds,
1182                      fd_set * exceptfds, struct timeval *timeout)
1183 {
1184         int ret;
1185
1186         ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1187         ret = select(n, readfds, writefds, exceptfds, timeout);
1188         ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1189         return ret;
1190 }
1191
1192
1193
1194 void *new_worker_thread(void *arg);
1195 extern void close_masters(void);
1196 void *select_on_master(void *args);
1197 void *select_on_client(void *args);
1198 CtdlThreadNode *client_select_thread;
1199 CtdlThreadNode *master_select_thread;
1200
1201 void go_threading(void)
1202 {
1203         int i;
1204         CtdlThreadNode *last_worker;
1205
1206         /*
1207          * Initialise the thread system
1208          */
1209         ctdl_thread_internal_init();
1210
1211         /* Second call to module init functions now that threading is up */
1212         initialise_modules(1);
1213         
1214         
1215                                 CtdlThreadCreate("House keeping",
1216                                                  CTDLTHREAD_BIGSTACK,
1217                                                  do_housekeeping, NULL);
1218
1219                                                  
1220 #ifdef NEW_WORKER
1221         master_select_thread = CtdlThreadCreate ("Select on Master", 0, select_on_master, NULL);
1222         client_select_thread = CtdlThreadCreate ("Select on client", 0, select_on_client, NULL);
1223 #endif
1224
1225         /*
1226          * This thread is now used for garbage collection of other threads in the thread list
1227          */
1228         CtdlLogPrintf(CTDL_INFO,
1229                       "Startup thread %d becoming garbage collector,\n",
1230                       citthread_self());
1231
1232         /*
1233          * We do a lot of locking and unlocking of the thread list in here.
1234          * We do this so that we can repeatedly release time for other threads
1235          * that may be waiting on the thread list.
1236          * We are a low priority thread so we can afford to do this
1237          */
1238
1239         while (CtdlThreadGetCount()) {
1240                 if (CT->signal)
1241                         exit_signal = CT->signal;
1242                 if (exit_signal) {
1243                         CtdlThreadStopAll();
1244 //                      close_masters();
1245                 }
1246                 check_sched_shutdown();
1247                 if (CT->state > CTDL_THREAD_STOP_REQ) {
1248                         begin_critical_section(S_THREAD_LIST);
1249                         ctdl_thread_internal_calc_loadavg();
1250                         end_critical_section(S_THREAD_LIST);
1251
1252                         ctdl_thread_internal_check_scheduled(); /* start scheduled threads */
1253                 }
1254
1255                 /* Reduce the size of the worker thread pool if necessary. */
1256                 if ((CtdlThreadGetWorkers() > config.c_min_workers + 1)
1257                     && (CtdlThreadWorkerAvg < 20)
1258                     && (CT->state > CTDL_THREAD_STOP_REQ)) {
1259                         /* Ask a worker thread to stop as we no longer need it */
1260                         begin_critical_section(S_THREAD_LIST);
1261                         last_worker = CtdlThreadList;
1262                         while (last_worker) {
1263                                 citthread_mutex_lock(&last_worker->
1264                                                      ThreadMutex);
1265                                 if (last_worker->flags & CTDLTHREAD_WORKER
1266                                     && (last_worker->state >
1267                                         CTDL_THREAD_STOPPING)
1268                                     && (last_worker->Context == NULL)) {
1269                                         citthread_mutex_unlock
1270                                             (&last_worker->ThreadMutex);
1271                                         break;
1272                                 }
1273                                 citthread_mutex_unlock(&last_worker->
1274                                                        ThreadMutex);
1275                                 last_worker = last_worker->next;
1276                         }
1277                         end_critical_section(S_THREAD_LIST);
1278                         if (last_worker) {
1279 #ifdef WITH_THREADLOG
1280                                 CtdlLogPrintf(CTDL_DEBUG,
1281                                               "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
1282                                               last_worker->name,
1283                                               last_worker->tid);
1284 #endif
1285                                 CtdlThreadStop(last_worker);
1286                         }
1287                 }
1288
1289                 /*
1290                  * If all our workers are working hard, start some more to help out
1291                  * with things
1292                  */
1293                 /* FIXME: come up with a better way to dynamically alter the number of threads
1294                  * based on the system load
1295                  */
1296 #ifdef NEW_WORKER
1297                 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1298                       && (CtdlThreadGetWorkers() <= num_sessions))
1299                      || CtdlThreadGetWorkers() < config.c_min_workers)
1300                     && (CT->state > CTDL_THREAD_STOP_REQ))
1301 #else
1302                 if ((((CtdlThreadGetWorkers() < config.c_max_workers)
1303                       && (CtdlThreadGetWorkerAvg() > 60)
1304                       && (CtdlThreadGetLoadAvg() < 90))
1305                      || CtdlThreadGetWorkers() < config.c_min_workers)
1306                     && (CT->state > CTDL_THREAD_STOP_REQ))
1307 #endif                          /* NEW_WORKER */
1308                 {
1309                         for (i = 0; i < 5; i++) {
1310 #ifdef NEW_WORKER
1311                                 CtdlThreadCreate("Worker Thread (new)",
1312                                                  CTDLTHREAD_BIGSTACK +
1313                                                  CTDLTHREAD_WORKER,
1314                                                  new_worker_thread, NULL);
1315 #else
1316                                 CtdlThreadCreate("Worker Thread",
1317                                                  CTDLTHREAD_BIGSTACK +
1318                                                  CTDLTHREAD_WORKER,
1319                                                  worker_thread, NULL);
1320 #endif                          /* NEW_WORKER */
1321                         }
1322                 }
1323
1324                 CtdlThreadGC();
1325
1326                 if (CtdlThreadGetCount() <= 1)  // Shutting down clean up the garbage collector
1327                 {
1328                         CtdlThreadGC();
1329                 }
1330
1331                 if (CtdlThreadGetCount())
1332                         CtdlThreadSleep(1);
1333         }
1334         /*
1335          * If the above loop exits we must be shutting down since we obviously have no threads
1336          */
1337         ctdl_thread_internal_cleanup();
1338 }
1339
1340
1341
1342
1343
1344 /*
1345  * Starting a new implimentation of a worker thread.
1346  * This new implimentation will be faster and do more work per thread.
1347  */
1348
1349 // TODO: need to sort out the thread states and signals
1350 // TODO: slect_on_master should not be a big stack thread.
1351 // TODO: slect_on_client should not be a big stack thread.
1352 // TODO: select_on_master is not a worker thread and should be blocked when in select
1353 // TODO: select_on_client is not a worker thread and should be blocked when in select
1354 /**
1355  * Select on master socket.
1356  * One specific thread comes in here and never leaves.
1357  * This thread blocks on select until something happens.
1358  * The select only returns if a new connection is made or the select is interrupted by some means.
1359  * We need to interrupt the select if the list of ServiceHook's changes or we are shutting down.
1360  * We should probably use a signal to interrupt the select is a ServiceHook is created.
1361  * When a ServiceHook is destroyed its socket will close which will awaken the select.
1362  */
1363 void *select_on_master(void *arg)
1364 {
1365         fd_set readfds;
1366         struct ServiceFunctionHook *serviceptr;
1367         int ssock;              /* Descriptor for client socket */
1368         int highest;
1369         int m, i;
1370         int retval = 0;
1371         struct CitContext *con;
1372
1373
1374
1375         while (!CtdlThreadCheckStop()) {
1376                 CtdlThreadName("select_on_master");
1377
1378                 /* Initialize the fdset. */
1379                 FD_ZERO(&readfds);
1380                 highest = 0;
1381
1382                 /* First, add the various master sockets to the fdset. */
1383                 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1384                      serviceptr = serviceptr->next) {
1385                         m = serviceptr->msock;
1386                         FD_SET(m, &readfds);
1387                         if (m > highest) {
1388                                 highest = m;
1389                         }
1390                 }
1391
1392         /** We can block indefinately since something will wake us up eventually
1393          * Even if it is a signal telling us the system is shutting down
1394          */
1395                 retval =
1396                     CtdlThreadSelect(highest + 1, &readfds, NULL, NULL,
1397                                      NULL);
1398
1399         /** Select got an error or we are shutting down so get out */
1400                 if (retval == 0 || CtdlThreadCheckStop()) {
1401                         return NULL;
1402                 }
1403
1404         /** Select says something happened on one of our master sockets so now we handle it */
1405                 for (serviceptr = ServiceHookTable; serviceptr != NULL;
1406                      serviceptr = serviceptr->next) {
1407                         if (FD_ISSET(serviceptr->msock, &readfds)) {
1408                                 ssock = accept(serviceptr->msock, NULL, 0);
1409                                 if (ssock >= 0) {
1410                                         CtdlLogPrintf(CTDL_DEBUG,
1411                                                       "New client socket %d\n",
1412                                                       ssock);
1413                                         /* The master socket is non-blocking but the client
1414                                          * sockets need to be blocking, otherwise certain
1415                                          * operations barf on FreeBSD.  Not a fatal error.
1416                                          */
1417                                         if (fcntl(ssock, F_SETFL, 0) < 0) {
1418                                                 CtdlLogPrintf(CTDL_EMERG,
1419                                                               "citserver: Can't set socket to blocking: %s\n",
1420                                                               strerror
1421                                                               (errno));
1422                                         }
1423                                         /* New context will be created already
1424                                          * set up in the CON_EXECUTING state.
1425                                          */
1426                                         con = CreateNewContext();
1427                                         /* Assign our new socket number to it. */
1428                                         con->client_socket = ssock;
1429                                         con->h_command_function =
1430                                             serviceptr->h_command_function;
1431                                         con->h_async_function =
1432                                             serviceptr->h_async_function;
1433                                         con->ServiceName =
1434                                             serviceptr->ServiceName;
1435                                         con->h_greeting_function = serviceptr->h_greeting_function;
1436                                         /* Determine whether it's a local socket */
1437                                         if (serviceptr->sockpath != NULL)
1438                                                 con->is_local_socket = 1;
1439
1440                                         /* Set the SO_REUSEADDR socket option */
1441                                         i = 1;
1442                                         setsockopt(ssock, SOL_SOCKET,
1443                                                    SO_REUSEADDR, &i,
1444                                                    sizeof(i));
1445
1446                         /** Now we can pass this context to an idle worker thread to get things going
1447                          * What if there are no idle workers?
1448                          * We could create one but what if the thread list is full?
1449                          * Then I guess we need to close the socket a reject the connection.
1450                          */
1451                         /** TODO: If there are no idle threads then this server is overloaded and we should reject the connection
1452                          * This will have the effect of throttling the incomming connections on master sockets
1453                          * a little and slow the process down.
1454                          */
1455 //                                      if (idle_workers)
1456                                         {
1457                                                 con->state = CON_START;
1458                                                 citthread_kill(client_select_thread->tid, SIGUSR1);
1459                                                 citthread_cond_signal(&worker_block);
1460                                         }
1461                                         // else
1462                                         // output try later message
1463                                         //start_context(con);
1464                                 }
1465                         }
1466                 }
1467         }
1468         return NULL;
1469 }
1470
1471
1472
1473 /*
1474  * Select on client socket.
1475  * Only one dedicated thread in here.
1476  * We have to interrupt our select whenever a context is returned to the CON_READY state.
1477  * as a result each context may be close to timing out its client so we have to calculate
1478  * which client socket will timeout first and expire our select on that time.
1479  * 
1480  */
1481 void *select_on_client(void *arg)
1482 {
1483         fd_set readfds;
1484         struct timeval tv, now, result;
1485         int retval = 0;
1486         int highest;
1487         struct CitContext *ptr;
1488
1489         CtdlThreadName("select_on_client");
1490
1491         while (!CtdlThreadCheckStop()) {
1492                 /* Initialise the fdset */
1493                 FD_ZERO(&readfds);
1494                 highest = 0;
1495                 /** Get the clients to select on */
1496                 tv.tv_sec = config.c_sleeping;
1497                 tv.tv_usec = 0;
1498                 begin_critical_section(S_SESSION_TABLE);
1499                 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1500                         if (ptr->state == CON_IDLE) {
1501                                 gettimeofday(&now, NULL);
1502                                 timersub(&(ptr->client_expires_at),
1503                                          &now, &result);
1504                                 if (result.tv_sec <= 0) {
1505                                         /** This client has timed out so kill it */
1506                                         ptr->kill_me = 1;
1507                                         continue;
1508                                 }
1509                                 /** Is this one going to expire first? */
1510                                 timersub(&result, &tv, &now);
1511                                 if (now.tv_sec <= 0 && now.tv_usec <= 0) {
1512                                         tv.tv_sec = result.tv_sec;
1513                                         tv.tv_usec = result.tv_usec;
1514                                 }
1515                                 FD_SET(ptr->client_socket, &readfds);
1516                                 if (ptr->client_socket > highest)
1517                                         highest = ptr->client_socket;
1518                         }
1519                 }
1520                 end_critical_section(S_SESSION_TABLE);
1521
1522                 /* Now we can select on any connections that are waiting */
1523                 if (!CtdlThreadCheckStop()) {
1524                         retval =
1525                             CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
1526                 } else {        /* Shutting down? */
1527
1528                         return NULL;
1529                 }
1530
1531
1532                 /* Now figure out who made this select() unblock.
1533                  * First, check for an error or exit condition.
1534                  */
1535                 if (retval < 0) {
1536                         if (errno == EBADF) {
1537                                 CtdlLogPrintf(CTDL_NOTICE,
1538                                               "select() failed: (%s)\n",
1539                                               strerror(errno));
1540                         }
1541                         if (errno != EINTR) {
1542                                 CtdlLogPrintf(CTDL_EMERG,
1543                                               "Exiting (%s)\n",
1544                                               strerror(errno));
1545                                 CtdlThreadStopAll();
1546                         } else if (!CtdlThreadCheckStop()) {
1547                                 CtdlLogPrintf(CTDL_DEBUG,
1548                                               "Un handled select failure.\n");
1549                         }
1550                 } else if (retval > 0) {
1551                         begin_critical_section(S_SESSION_TABLE);
1552                         for (ptr = ContextList; ptr != NULL;
1553                              ptr = ptr->next) {
1554                                 if ((FD_ISSET
1555                                      (ptr->client_socket, &readfds))
1556                                     && (ptr->state == CON_IDLE)) {
1557                                         ptr->input_waiting = 1;
1558                                         ptr->state = CON_READY;
1559                                         /** reset client expire time */
1560                                         ptr->client_expires_at.tv_sec = config.c_sleeping;
1561                                         ptr->client_expires_at.tv_usec = 0;
1562                                 }
1563                         }
1564                         end_critical_section(S_SESSION_TABLE);
1565                 }
1566         }
1567         return NULL;
1568 }
1569
1570
1571
1572 /*
1573  * Do the worker threads work when needed
1574  */
1575 int execute_session(struct CitContext *bind_me)
1576 {
1577         int force_purge;
1578
1579         become_session(bind_me);
1580
1581         /* If the client has sent a command, execute it. */
1582         if (CC->input_waiting) {
1583                 CC->h_command_function();
1584                 CC->input_waiting = 0;
1585         }
1586
1587         /* If there are asynchronous messages waiting and the
1588          * client supports it, do those now */
1589         if ((CC->is_async) && (CC->async_waiting)
1590             && (CC->h_async_function != NULL)) {
1591                 CC->h_async_function();
1592                 CC->async_waiting = 0;
1593         }
1594
1595         force_purge = CC->kill_me;
1596         if (force_purge)
1597                 CT->Context = NULL;
1598         become_session(NULL);
1599         bind_me->state = CON_IDLE;
1600         return force_purge;
1601 }
1602
1603
1604
1605 extern void dead_session_purge(int force);
1606
1607 /*
1608  * A new worker_thread loop.
1609  */
1610
1611 void *new_worker_thread(void *arg)
1612 {
1613         struct CitContext *bind_me, *ptr;
1614         int force_purge;
1615
1616         while (!CtdlThreadCheckStop()) {
1617
1618                 /* make doubly sure we're not holding any stale db handles
1619                  * which might cause a deadlock.
1620                  */
1621                 cdb_check_handles();
1622                 force_purge = 0;
1623                 bind_me = NULL; /* Which session shall we handle? */
1624
1625                 begin_critical_section(S_SESSION_TABLE);
1626                 for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
1627                         if (ptr->state == CON_START) {
1628                                 ptr->state = CON_EXECUTING;
1629                                 end_critical_section(S_SESSION_TABLE);
1630                                 become_session(ptr);
1631                                 begin_session(ptr);
1632                                 ptr->h_greeting_function();
1633                                 become_session(NULL);
1634                                 ptr->state = CON_IDLE;
1635                                 break;
1636                         }
1637                         if (ptr->state == CON_READY) {
1638                                 ptr->state = CON_EXECUTING;
1639                                 end_critical_section(S_SESSION_TABLE);
1640                                 force_purge = execute_session(ptr);
1641                                 break;
1642                         }
1643
1644                 }
1645                 end_critical_section(S_SESSION_TABLE);
1646
1647                 dead_session_purge(force_purge);
1648                 
1649                 /** block the worker threads waiting for a select to do something */
1650                 idle_workers++;
1651                 ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
1652                 citthread_mutex_lock(&worker_block_mutex);
1653                 citthread_cond_wait(&worker_block, &worker_block_mutex);
1654                 citthread_mutex_unlock(&worker_block_mutex);
1655                 ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
1656                 idle_workers--;
1657                 
1658                 if (CtdlThreadCheckStop())
1659                         break;
1660
1661         }
1662         return NULL;
1663 }