]> code.citadel.org Git - citadel.git/blobdiff - citadel/sysdep.c
* The scheduler can now "wake up" a session to deliver async messages.
[citadel.git] / citadel / sysdep.c
index eb3c7bcec689a7c10627569a2e6b40ba2a3530bd..6da856165e68c7b91c7a6ef9b05135f2df6deba0 100644 (file)
@@ -91,14 +91,10 @@ pthread_key_t MyConKey;                             /* TSD key for MyContext() */
 int verbosity = DEFAULT_VERBOSITY;             /* Logging level */
 
 struct CitContext masterCC;
-int rescan[2];                                 /* The Rescan Pipe */
 time_t last_purge = 0;                         /* Last dead session purge */
 static int num_threads = 0;                    /* Current number of threads */
 int num_sessions = 0;                          /* Current number of sessions */
 
-fd_set masterfds;                              /* Master sockets etc. */
-int masterhighest;
-
 pthread_t initial_thread;              /* tid for main() thread */
 
 int syslog_facility = (-1);
@@ -250,16 +246,17 @@ void begin_critical_section(int which_one)
 {
        /* lprintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
 
-
-       /* ensure nobody ever tries to do a critical section within a
-               transaction; this could lead to deadlock. */
+       /* For all types of critical sections except those listed here,
+        * ensure nobody ever tries to do a critical section within a
+        * transaction; this could lead to deadlock.
+        */
+       if (    (which_one != S_FLOORCACHE)
 #ifdef DEBUG_MEMORY_LEAKS
-       if (which_one != S_DEBUGMEMLEAKS) {
+               && (which_one != S_DEBUGMEMLEAKS)
 #endif
+       ) {
                cdb_check_handles();
-#ifdef DEBUG_MEMORY_LEAKS
        }
-#endif
        pthread_mutex_lock(&Critters[which_one]);
 }
 
@@ -268,7 +265,6 @@ void begin_critical_section(int which_one)
  */
 void end_critical_section(int which_one)
 {
-       /* lprintf(CTDL_DEBUG, "end_critical_section(%d)\n", which_one); */
        pthread_mutex_unlock(&Critters[which_one]);
 }
 
@@ -777,17 +773,21 @@ void create_worker(void) {
  * Purge all sessions which have the 'kill_me' flag set.
  * This function has code to prevent it from running more than once every
  * few seconds, because running it after every single unbind would waste a lot
- * of CPU time and keep the context list locked too much.
+ * of CPU time and keep the context list locked too much.  To force it to run
+ * anyway, set "force" to nonzero.
+ *
  *
- * After that's done, we raise or lower the size of the worker thread pool
+ * After that's done, we raise the size of the worker thread pool
  * if such an action is appropriate.
  */
-void dead_session_purge(void) {
+void dead_session_purge(int force) {
        struct CitContext *ptr, *rem;
-       struct worker_node **node, *tmp;
-       pthread_t self;
 
-       if ( (time(NULL) - last_purge) < 5 ) return;    /* Too soon, go away */
+       if (force == 0) {
+               if ( (time(NULL) - last_purge) < 5 ) {
+                       return; /* Too soon, go away */
+               }
+       }
        time(&last_purge);
 
        do {
@@ -810,12 +810,7 @@ void dead_session_purge(void) {
 
        } while (rem != NULL);
 
-
-       /* Raise or lower the size of the worker thread pool if such
-        * an action is appropriate.
-        */
-
-       self = pthread_self();
+       /* Raise the size of the worker thread pool if necessary. */
 
        if ( (num_sessions > num_threads)
           && (num_threads < config.c_max_workers) ) {
@@ -823,33 +818,6 @@ void dead_session_purge(void) {
                create_worker();
                end_critical_section(S_WORKER_LIST);
        }
-       
-       /* don't let the initial thread die since it's responsible for
-          waiting for all the other threads to terminate. */
-       else if ( (num_sessions < num_threads)
-          && (num_threads > config.c_min_workers)
-          && (self != initial_thread) ) {
-               cdb_free_tsd();
-               begin_critical_section(S_WORKER_LIST);
-               --num_threads;
-
-               /* we're exiting before server shutdown... unlink ourself from
-                  the worker list and detach our thread to avoid memory leaks
-                */
-
-               for (node = &worker_list; *node != NULL; node = &(*node)->next)
-                       if ((*node)->tid == self) {
-                               tmp = *node;
-                               *node = (*node)->next;
-                               free(tmp);
-                               break;
-                       }
-
-               pthread_detach(self);
-               end_critical_section(S_WORKER_LIST);
-               pthread_exit(NULL);
-       }
-
 }
 
 
@@ -884,37 +852,7 @@ void InitializeMasterCC(void) {
 
 
 
-/*
- * Set up a fd_set containing all the master sockets to which we
- * always listen.  It's computationally less expensive to just copy
- * this to a local fd_set when starting a new select() and then add
- * the client sockets than it is to initialize a new one and then
- * figure out what to put there.
- */
-void init_master_fdset(void) {
-       struct ServiceFunctionHook *serviceptr;
-       int m;
-
-       lprintf(CTDL_DEBUG, "Initializing master fdset\n");
-
-       FD_ZERO(&masterfds);
-       masterhighest = 0;
 
-       lprintf(CTDL_DEBUG, "Will listen on rescan pipe %d\n", rescan[0]);
-       FD_SET(rescan[0], &masterfds);
-       if (rescan[0] > masterhighest) masterhighest = rescan[0];
-
-       for (serviceptr = ServiceHookTable; serviceptr != NULL;
-           serviceptr = serviceptr->next ) {
-               m = serviceptr->msock;
-               lprintf(CTDL_DEBUG, "Will listen on master socket %d\n", m);
-               FD_SET(m, &masterfds);
-               if (m > masterhighest) {
-                       masterhighest = m;
-               }
-       }
-       lprintf(CTDL_DEBUG, "masterhighest = %d\n", masterhighest);
-}
 
 
 /*
@@ -931,10 +869,7 @@ INLINE void become_session(struct CitContext *which_con) {
  */    
 void *worker_thread(void *arg) {
        int i;
-       char junk;
        int highest;
-       /* This is synchronized below; it helps implement round robin mode */
-       static struct CitContext* next_session = NULL;
        struct CitContext *ptr;
        struct CitContext *bind_me = NULL;
        fd_set readfds;
@@ -943,6 +878,8 @@ void *worker_thread(void *arg) {
        struct ServiceFunctionHook *serviceptr;
        int ssock;                      /* Descriptor for client socket */
        struct timeval tv;
+       int force_purge = 0;
+       int m;
 
        num_threads++;
 
@@ -966,10 +903,15 @@ void *worker_thread(void *arg) {
                 * which might cause a deadlock.
                 */
                cdb_check_handles();
+               force_purge = 0;
+               bind_me = NULL;         /* Which session shall we handle? */
 
                begin_critical_section(S_I_WANNA_SELECT);
-SETUP_FD:      memcpy(&readfds, &masterfds, sizeof masterfds);
-               highest = masterhighest;
+
+               /* Initialize the fdset. */
+               FD_ZERO(&readfds);
+               highest = 0;
+
                begin_critical_section(S_SESSION_TABLE);
                for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
                        if (ptr->state == CON_IDLE) {
@@ -977,15 +919,32 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds);
                                if (ptr->client_socket > highest)
                                        highest = ptr->client_socket;
                        }
+                       if ((bind_me == NULL) && (ptr->state == CON_READY)) {
+                               bind_me = ptr;
+                               ptr->state = CON_EXECUTING;
+                       }
                }
                end_critical_section(S_SESSION_TABLE);
 
-               tv.tv_sec = 1;          /* wake up every second if no input */
-               tv.tv_usec = 0;
+               if (bind_me) goto SKIP_SELECT;
 
-               do_select:
-               if (!time_to_die)
+do_select:     /* Only select() if the server isn't being told to shut down. */
+
+               /* Add the various master sockets to the fdset. */
+               for (serviceptr = ServiceHookTable; serviceptr != NULL;
+               serviceptr = serviceptr->next ) {
+                       m = serviceptr->msock;
+                       FD_SET(m, &readfds);
+                       if (m > highest) {
+                               highest = m;
+                       }
+               }
+
+               if (!time_to_die) {
+                       tv.tv_sec = 1;          /* wake up every second if no input */
+                       tv.tv_usec = 0;
                        retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+               }
                else {
                        end_critical_section(S_I_WANNA_SELECT);
                        break;
@@ -995,6 +954,11 @@ SETUP_FD:  memcpy(&readfds, &masterfds, sizeof masterfds);
                 * First, check for an error or exit condition.
                 */
                if (retval < 0) {
+                       if (errno == EBADF) {
+                               lprintf(CTDL_NOTICE, "select() failed: (%s)\n",
+                                       strerror(errno));
+                               goto do_select;
+                       }
                        if (errno != EINTR) {
                                lprintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno));
                                time_to_die = 1;
@@ -1029,6 +993,8 @@ SETUP_FD:  memcpy(&readfds, &masterfds, sizeof masterfds);
                                        con->client_socket = ssock;
                                        con->h_command_function =
                                                serviceptr->h_command_function;
+                                       con->h_async_function =
+                                               serviceptr->h_async_function;
 
                                        /* Determine whether local socket */
                                        if (serviceptr->sockpath != NULL)
@@ -1045,79 +1011,63 @@ SETUP_FD:       memcpy(&readfds, &masterfds, sizeof masterfds);
                                        serviceptr->h_greeting_function();
                                        become_session(NULL);
                                        con->state = CON_IDLE;
-                                       goto SETUP_FD;
+                                       goto do_select;
                                }
                        }
                }
 
-               /* If the rescan pipe went active, someone is telling this
-                * thread that the &readfds needs to be refreshed with more
-                * current data.
-                */
                if (time_to_die) {
                        end_critical_section(S_I_WANNA_SELECT);
                        break;
                }
 
-               if (FD_ISSET(rescan[0], &readfds)) {
-                       read(rescan[0], &junk, 1);
-                       goto SETUP_FD;
-               }
-
                /* It must be a client socket.  Find a context that has data
-                * waiting on its socket *and* is in the CON_IDLE state.
+                * waiting on its socket *and* is in the CON_IDLE state.  Any
+                * active sockets other than our chosen one are marked as
+                * CON_READY so the next thread that comes around can just bind
+                * to one without having to select() again.
                 */
-               else {
-                       bind_me = NULL;
-                       begin_critical_section(S_SESSION_TABLE);
-                       /*
-                        * We start where we left off.  If we get to the end
-                        * we'll start from the beginning again, then give up
-                        * if we still don't find anything.  This ensures
-                        * that all contexts get a more-or-less equal chance
-                        * to run. And yes, I did add a goto to the code. -IO
-                        */
-find_session:          if (next_session == NULL)
-                               next_session = ContextList;
-                       for (ptr = next_session;
-                           ( (ptr != NULL) && (bind_me == NULL) );
-                           ptr = ptr->next) {
-                               if ( (FD_ISSET(ptr->client_socket, &readfds))
-                                  && (ptr->state == CON_IDLE) ) {
-                                       bind_me = ptr;
+               begin_critical_section(S_SESSION_TABLE);
+               for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
+                       if ( (FD_ISSET(ptr->client_socket, &readfds))
+                          && (ptr->state != CON_EXECUTING) ) {
+                               ptr->input_waiting = 1;
+                               if (!bind_me) {
+                                       bind_me = ptr;  /* I choose you! */
+                                       bind_me->state = CON_EXECUTING;
+                               }
+                               else {
+                                       ptr->state = CON_READY;
                                }
                        }
-                       if (bind_me != NULL) {
-                               /* Found one.  Stake a claim to it before
-                                * letting anyone else touch the context list.
-                                */
-                               bind_me->state = CON_EXECUTING;
-                               next_session = bind_me->next;
-                       } else if (next_session == ContextList) {
-                               next_session = NULL;
-                       }
-                       if (bind_me == NULL && next_session != NULL) {
-                               next_session = NULL;
-                               goto find_session;
-                       }
+               }
+               end_critical_section(S_SESSION_TABLE);
+SKIP_SELECT:   end_critical_section(S_I_WANNA_SELECT);
 
-                       end_critical_section(S_SESSION_TABLE);
-                       end_critical_section(S_I_WANNA_SELECT);
+               /* We're bound to a session */
+               if (bind_me != NULL) {
+                       become_session(bind_me);
 
-                       /* We're bound to a session, now do *one* command */
-                       if (bind_me != NULL) {
-                               become_session(bind_me);
+                       /* If the client has sent a command, execute it. */
+                       if (CC->input_waiting) {
                                CC->h_command_function();
-                               become_session(NULL);
-                               bind_me->state = CON_IDLE;
-                               if (bind_me->kill_me == 1) {
-                                       RemoveContext(bind_me);
-                               } 
-                               write(rescan[1], &junk, 1);
+                               CC->input_waiting = 0;
                        }
 
+                       /* If there are asynchronous messages waiting and the
+                        * client supports it, do those now */
+                       if ((CC->is_async) && (CC->async_waiting)
+                          && (CC->h_async_function != NULL)) {
+                               CC->h_async_function();
+                               CC->async_waiting = 0;
+                       }
+                       
+                       force_purge = CC->kill_me;
+                       become_session(NULL);
+                       bind_me->state = CON_IDLE;
                }
-               dead_session_purge();
+
+               dead_session_purge(force_purge);
                do_housekeeping();
                check_sched_shutdown();
        }