]> code.citadel.org Git - citadel.git/blobdiff - citadel/threads.c
Did away with lprintf all together now its called CtdlLogPrintf()
[citadel.git] / citadel / threads.c
index 7d689ca6e72f45217ce4b20efe840dc6f7e0a62d..c4003b012086dc9eba69cbbdcff3448fb09e5b6d 100644 (file)
@@ -8,6 +8,7 @@
  *
  */
 
+#include <sys/types.h>
 #include <errno.h>
 #include <sys/socket.h>
 #include <unistd.h>
@@ -56,7 +57,7 @@ static int num_workers = 0;                   /* Current number of worker threads */
 CtdlThreadNode *CtdlThreadList = NULL;
 CtdlThreadNode *CtdlThreadSchedList = NULL;
 
-static citthread_t GC_thread;
+static CtdlThreadNode *GC_thread = NULL;
 static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE];
 double CtdlThreadLoadAvg = 0;
 double CtdlThreadWorkerAvg = 0;
@@ -152,8 +153,7 @@ void ctdl_thread_internal_init_tsd(void)
        int ret;
        
        if ((ret = citthread_key_create(&ThreadKey, ctdl_thread_internal_dest_tsd))) {
-               lprintf(CTDL_EMERG, "citthread_key_create: %s\n",
-                       strerror(ret));
+               CtdlLogPrintf(CTDL_EMERG, "citthread_key_create: %s\n", strerror(ret));
                exit(CTDLEXIT_DB);
        }
 }
@@ -220,7 +220,6 @@ void ctdl_thread_internal_init(void)
        CtdlThreadNode *this_thread;
        int ret = 0;
        
-       GC_thread = citthread_self();
        CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread");
        CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread");
        CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created");
@@ -257,7 +256,8 @@ void ctdl_thread_internal_init(void)
 
        this_thread->name = "Garbage Collection Thread";
        
-       this_thread->tid = GC_thread;
+       this_thread->tid = citthread_self();
+       GC_thread = this_thread;
        CT = this_thread;
        
        num_threads++;  // Increase the count of threads in the system.
@@ -337,7 +337,8 @@ void CtdlThreadStopAll(void)
                ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ);
                citthread_cond_signal(&this_thread->ThreadCond);
                citthread_cond_signal(&this_thread->SleepCond);
-               CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (%ld).\n", this_thread->name, this_thread->tid);
+               CtdlLogPrintf(CTDL_DEBUG, "Thread system stopping thread \"%s\" (0x%08lx).\n",
+                       this_thread->name, this_thread->tid);
                this_thread = this_thread->next;
        }
        end_critical_section(S_THREAD_LIST);
@@ -550,7 +551,7 @@ static void ctdl_internal_thread_cleanup(void *arg)
         * In here we were called by the current thread because it is exiting
         * NB. WE ARE THE CURRENT THREAD
         */
-       CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (%ld) exited.\n", CT->name, CT->tid);
+       CtdlLogPrintf(CTDL_NOTICE, "Thread \"%s\" (0x%08lx) exited.\n", CT->name, CT->tid);
        
        #ifdef HAVE_BACKTRACE
        eCrash_UnregisterThread();
@@ -625,9 +626,9 @@ void CtdlThreadGC (void)
        if(num_threads == 1)
                CtdlThreadList->state = CTDL_THREAD_EXITED;
        
-#ifdef WITH_THREADLOG
+// #ifdef WITH_THREADLOG
        CtdlLogPrintf(CTDL_DEBUG, "Thread system running garbage collection.\n");
-#endif
+// #endif
        /*
         * Woke up to do garbage collection
         */
@@ -637,8 +638,31 @@ void CtdlThreadGC (void)
                that_thread = this_thread;
                this_thread = this_thread->next;
                
+               if ((that_thread->state == CTDL_THREAD_STOP_REQ || that_thread->state == CTDL_THREAD_STOPPING)
+                       && (!citthread_equal(that_thread->tid, citthread_self())))
+                               that_thread->stop_ticker++;
+               else
+               {
+                       /**
+                        * Catch the situation where a worker was asked to stop but couldn't and we are not
+                        * shutting down.
+                        */
+                       that_thread->stop_ticker = 0;
+               }
+               
+               if (that_thread->stop_ticker == 5)
+               {
+                       CtdlLogPrintf(CTDL_DEBUG, "Thread System: The thread \"%s\" (0x%08lx) failed to self terminate within 5 ticks. It would be cancelled now.\n", that_thread->name, that_thread->tid);
+                       if ((that_thread->flags & CTDLTHREAD_WORKER) == 0)
+                               CtdlLogPrintf(CTDL_INFO, "Thread System: A non worker thread would have been canceled this may cause message loss.\n");
+//                     that_thread->state = CTDL_THREAD_CANCELLED;
+                       that_thread->stop_ticker++;
+//                     citthread_cancel(that_thread->tid);
+//                     continue;
+               }
+               
                /* Do we need to clean up this thread? */
-               if (that_thread->state != CTDL_THREAD_EXITED)
+               if ((that_thread->state != CTDL_THREAD_EXITED) && (that_thread->state != CTDL_THREAD_CANCELLED))
                {
                        if(that_thread->flags & CTDLTHREAD_WORKER)
                                workers++;      /* Sanity check on number of worker threads */
@@ -689,11 +713,12 @@ void CtdlThreadGC (void)
                else if (ret == ESRCH)
                        CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, no thread to join on.\n");
                else if (ret != 0)
-                       CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, citthread_join returned an unknown error.\n");
+                       CtdlLogPrintf(CTDL_DEBUG, "Garbage collection, citthread_join returned an unknown error(%d).\n", ret);
                /*
                 * Now we own that thread entry
                 */
-               CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (%ld).\n", that_thread->name, that_thread->tid);
+               CtdlLogPrintf(CTDL_INFO, "Garbage Collection for thread \"%s\" (0x%08lx).\n",
+                       that_thread->name, that_thread->tid);
                citthread_mutex_destroy(&that_thread->ThreadMutex);
                citthread_cond_destroy(&that_thread->ThreadCond);
                citthread_mutex_destroy(&that_thread->SleepMutex);
@@ -764,9 +789,8 @@ static void *ctdl_internal_thread_func (void *arg)
        #endif
        
        // Tell the world we are here
-       CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (%ld). \n", this_thread->name, this_thread->tid);
-
-       
+       CtdlLogPrintf(CTDL_NOTICE, "Created a new thread \"%s\" (0x%08lx).\n",
+               this_thread->name, this_thread->tid);
        
        /*
         * run the thread to do the work but only if we haven't been asked to stop
@@ -1105,7 +1129,8 @@ void ctdl_thread_internal_check_scheduled(void)
 #ifdef WITH_THREADLOG
                else
                {
-                       CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n", that_thread->name, that_thread->when - time(NULL));
+                       CtdlLogPrintf(CTDL_DEBUG, "Thread \"%s\" will start in %ld seconds.\n",
+                               that_thread->name, that_thread->when - time(NULL));
                }
 #endif
        }
@@ -1122,7 +1147,36 @@ int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds
        
        ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED);
        ret = select(n, readfds, writefds, exceptfds, timeout);
-       ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
+       /**
+        * If the select returned <= 0 then it failed due to an error
+        * or timeout so this thread could stop if asked to do so.
+        * Anything else means it needs to continue unless the system is shutting down
+        */
+       if (ret <= 0)
+       {
+               /**
+                * select says nothing to do so we can change to running if we haven't been asked to stop.
+                */
+               ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING);
+       }
+       else
+       {
+               /**
+                * The select says this thread needs to do something useful.
+                * This thread was in an idle state so it may have been asked to stop
+                * but if the system isn't shutting down this thread is no longer
+                * idle and select has given it a task to do so it must not stop
+                * In this condition we need to force it into the running state.
+                * CtdlThreadGC will clear its ticker for us.
+                */
+               if (GC_thread->state > CTDL_THREAD_STOP_REQ)
+               {
+                       citthread_mutex_lock(&CT->ThreadMutex); /* To prevent race condition of a sleeping thread */
+                       CT->state = CTDL_THREAD_RUNNING;
+                       citthread_mutex_unlock(&CT->ThreadMutex);
+               }
+       }
+
        return ret;
 }
 
@@ -1198,7 +1252,7 @@ void go_threading(void)
                        if (last_worker)
                        {
 #ifdef WITH_THREADLOG
-                               CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (%ld).\n",
+                               CtdlLogPrintf(CTDL_DEBUG, "Thread system, stopping excess worker thread \"%s\" (0x%08lx).\n",
                                        last_worker->name,
                                        last_worker->tid
                                        );