]> code.citadel.org Git - citadel.git/blobdiff - citadel/sysdep.c
* Replaced all "Citadel/UX" references with "Citadel"
[citadel.git] / citadel / sysdep.c
index 99629d7f1244062c0172e6b236b105ba3b9ceebd..9c97ef21df290107394eef25b319dd3c2d80684d 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * $Id$
  *
- * Citadel/UX "system dependent" stuff.
+ * Citadel "system dependent" stuff.
  * See copyright.txt for copyright information.
  *
  * Here's where we (hopefully) have most parts of the Citadel server that
@@ -91,14 +91,10 @@ pthread_key_t MyConKey;                             /* TSD key for MyContext() */
 int verbosity = DEFAULT_VERBOSITY;             /* Logging level */
 
 struct CitContext masterCC;
-int rescan[2];                                 /* The Rescan Pipe */
 time_t last_purge = 0;                         /* Last dead session purge */
 static int num_threads = 0;                    /* Current number of threads */
 int num_sessions = 0;                          /* Current number of sessions */
 
-fd_set masterfds;                              /* Master sockets etc. */
-int masterhighest;
-
 pthread_t initial_thread;              /* tid for main() thread */
 
 int syslog_facility = (-1);
@@ -111,12 +107,12 @@ int syslog_facility = (-1);
  * log data sent through this function.  BE CAREFUL!
  */
 void lprintf(enum LogLevel loglevel, const char *format, ...) {   
-        va_list arg_ptr;
+       va_list arg_ptr;
        char buf[SIZ];
  
-        va_start(arg_ptr, format);   
-        vsnprintf(buf, sizeof(buf), format, arg_ptr);   
-        va_end(arg_ptr);   
+       va_start(arg_ptr, format);   
+       vsnprintf(buf, sizeof(buf), format, arg_ptr);   
+       va_end(arg_ptr);   
 
        if (syslog_facility >= 0) {
                if (loglevel <= verbosity) {
@@ -250,16 +246,17 @@ void begin_critical_section(int which_one)
 {
        /* lprintf(CTDL_DEBUG, "begin_critical_section(%d)\n", which_one); */
 
-
-       /* ensure nobody ever tries to do a critical section within a
-               transaction; this could lead to deadlock. */
+       /* For all types of critical sections except those listed here,
+        * ensure nobody ever tries to do a critical section within a
+        * transaction; this could lead to deadlock.
+        */
+       if (    (which_one != S_FLOORCACHE)
 #ifdef DEBUG_MEMORY_LEAKS
-       if (which_one != S_DEBUGMEMLEAKS) {
+               && (which_one != S_DEBUGMEMLEAKS)
 #endif
+       ) {
                cdb_check_handles();
-#ifdef DEBUG_MEMORY_LEAKS
        }
-#endif
        pthread_mutex_lock(&Critters[which_one]);
 }
 
@@ -268,7 +265,6 @@ void begin_critical_section(int which_one)
  */
 void end_critical_section(int which_one)
 {
-       /* lprintf(CTDL_DEBUG, "end_critical_section(%d)\n", which_one); */
        pthread_mutex_unlock(&Critters[which_one]);
 }
 
@@ -279,7 +275,7 @@ void end_critical_section(int which_one)
  * a TCP port.  The server shuts down if the bind fails.
  *
  */
-int ig_tcp_server(int port_number, int queue_len)
+int ig_tcp_server(char *ip_addr, int port_number, int queue_len)
 {
        struct sockaddr_in sin;
        int s, i;
@@ -290,8 +286,17 @@ int ig_tcp_server(int port_number, int queue_len)
 
        memset(&sin, 0, sizeof(sin));
        sin.sin_family = AF_INET;
-       sin.sin_addr.s_addr = INADDR_ANY;
        sin.sin_port = htons((u_short)port_number);
+       if (ip_addr == NULL) {
+               sin.sin_addr.s_addr = INADDR_ANY;
+       }
+       else {
+               sin.sin_addr.s_addr = inet_addr(ip_addr);
+       }
+                                                                               
+       if (sin.sin_addr.s_addr == INADDR_NONE) {
+               sin.sin_addr.s_addr = INADDR_ANY;
+       }
 
        s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
 
@@ -452,7 +457,7 @@ DONE:       ++num_sessions;
 
 /*
  * buffer_output() ... tell client_write to buffer all output until
- *                     instructed to dump it all out later
+ *                  instructed to dump it all out later
  */
 void buffer_output(void) {
        if (CC->buffering == 0) {
@@ -533,15 +538,15 @@ void client_write(char *buf, int nbytes)
 
 /*
  * cprintf()  ...   Send formatted printable data to the client.   It is
- *                  implemented in terms of client_write() but remains in
- *                  sysdep.c in case we port to somewhere without va_args...
+ *               implemented in terms of client_write() but remains in
+ *               sysdep.c in case we port to somewhere without va_args...
  */
 void cprintf(const char *format, ...) {   
-        va_list arg_ptr;   
-        char buf[SIZ];   
+       va_list arg_ptr;   
+       char buf[SIZ];   
    
-        va_start(arg_ptr, format);   
-        if (vsnprintf(buf, sizeof buf, format, arg_ptr) == -1)
+       va_start(arg_ptr, format);   
+       if (vsnprintf(buf, sizeof buf, format, arg_ptr) == -1)
                buf[sizeof buf - 2] = '\n';
        client_write(buf, strlen(buf)); 
        va_end(arg_ptr);
@@ -777,17 +782,21 @@ void create_worker(void) {
  * Purge all sessions which have the 'kill_me' flag set.
  * This function has code to prevent it from running more than once every
  * few seconds, because running it after every single unbind would waste a lot
- * of CPU time and keep the context list locked too much.
+ * of CPU time and keep the context list locked too much.  To force it to run
+ * anyway, set "force" to nonzero.
  *
- * After that's done, we raise or lower the size of the worker thread pool
+ *
+ * After that's done, we raise the size of the worker thread pool
  * if such an action is appropriate.
  */
-void dead_session_purge(void) {
+void dead_session_purge(int force) {
        struct CitContext *ptr, *rem;
-       struct worker_node **node, *tmp;
-       pthread_t self;
 
-       if ( (time(NULL) - last_purge) < 5 ) return;    /* Too soon, go away */
+       if (force == 0) {
+               if ( (time(NULL) - last_purge) < 5 ) {
+                       return; /* Too soon, go away */
+               }
+       }
        time(&last_purge);
 
        do {
@@ -810,12 +819,7 @@ void dead_session_purge(void) {
 
        } while (rem != NULL);
 
-
-       /* Raise or lower the size of the worker thread pool if such
-        * an action is appropriate.
-        */
-
-       self = pthread_self();
+       /* Raise the size of the worker thread pool if necessary. */
 
        if ( (num_sessions > num_threads)
           && (num_threads < config.c_max_workers) ) {
@@ -823,33 +827,6 @@ void dead_session_purge(void) {
                create_worker();
                end_critical_section(S_WORKER_LIST);
        }
-       
-       /* don't let the initial thread die since it's responsible for
-          waiting for all the other threads to terminate. */
-       else if ( (num_sessions < num_threads)
-          && (num_threads > config.c_min_workers)
-          && (self != initial_thread) ) {
-               cdb_free_tsd();
-               begin_critical_section(S_WORKER_LIST);
-               --num_threads;
-
-               /* we're exiting before server shutdown... unlink ourself from
-                  the worker list and detach our thread to avoid memory leaks
-                */
-
-               for (node = &worker_list; *node != NULL; node = &(*node)->next)
-                       if ((*node)->tid == self) {
-                               tmp = *node;
-                               *node = (*node)->next;
-                               free(tmp);
-                               break;
-                       }
-
-               pthread_detach(self);
-               end_critical_section(S_WORKER_LIST);
-               pthread_exit(NULL);
-       }
-
 }
 
 
@@ -884,37 +861,7 @@ void InitializeMasterCC(void) {
 
 
 
-/*
- * Set up a fd_set containing all the master sockets to which we
- * always listen.  It's computationally less expensive to just copy
- * this to a local fd_set when starting a new select() and then add
- * the client sockets than it is to initialize a new one and then
- * figure out what to put there.
- */
-void init_master_fdset(void) {
-       struct ServiceFunctionHook *serviceptr;
-       int m;
 
-       lprintf(CTDL_DEBUG, "Initializing master fdset\n");
-
-       FD_ZERO(&masterfds);
-       masterhighest = 0;
-
-       lprintf(CTDL_DEBUG, "Will listen on rescan pipe %d\n", rescan[0]);
-       FD_SET(rescan[0], &masterfds);
-       if (rescan[0] > masterhighest) masterhighest = rescan[0];
-
-       for (serviceptr = ServiceHookTable; serviceptr != NULL;
-           serviceptr = serviceptr->next ) {
-               m = serviceptr->msock;
-               lprintf(CTDL_DEBUG, "Will listen on master socket %d\n", m);
-               FD_SET(m, &masterfds);
-               if (m > masterhighest) {
-                       masterhighest = m;
-               }
-       }
-       lprintf(CTDL_DEBUG, "masterhighest = %d\n", masterhighest);
-}
 
 
 /*
@@ -931,10 +878,7 @@ INLINE void become_session(struct CitContext *which_con) {
  */    
 void *worker_thread(void *arg) {
        int i;
-       char junk;
        int highest;
-       /* This is synchronized below; it helps implement round robin mode */
-       static struct CitContext* next_session = NULL;
        struct CitContext *ptr;
        struct CitContext *bind_me = NULL;
        fd_set readfds;
@@ -943,6 +887,8 @@ void *worker_thread(void *arg) {
        struct ServiceFunctionHook *serviceptr;
        int ssock;                      /* Descriptor for client socket */
        struct timeval tv;
+       int force_purge = 0;
+       int m;
 
        num_threads++;
 
@@ -950,26 +896,17 @@ void *worker_thread(void *arg) {
 
        while (!time_to_die) {
 
-               /* 
-                * A naive implementation would have all idle threads
-                * calling select() and then they'd all wake up at once
-                * (known in computer science as the "thundering herd"
-                * problem).  We solve this problem by putting the select()
-                * in a critical section, so only one thread has the
-                * opportunity to wake up.  If we wake up on a master
-                * socket, create a new session context; otherwise, just
-                * bind the thread to the context we want and go on our
-                * merry way.
-                */
-
                /* make doubly sure we're not holding any stale db handles
                 * which might cause a deadlock.
                 */
                cdb_check_handles();
+do_select:     force_purge = 0;
+               bind_me = NULL;         /* Which session shall we handle? */
+
+               /* Initialize the fdset. */
+               FD_ZERO(&readfds);
+               highest = 0;
 
-               begin_critical_section(S_I_WANNA_SELECT);
-SETUP_FD:      memcpy(&readfds, &masterfds, sizeof masterfds);
-               highest = masterhighest;
                begin_critical_section(S_SESSION_TABLE);
                for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
                        if (ptr->state == CON_IDLE) {
@@ -977,17 +914,36 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds);
                                if (ptr->client_socket > highest)
                                        highest = ptr->client_socket;
                        }
+                       if ((bind_me == NULL) && (ptr->state == CON_READY)) {
+                               bind_me = ptr;
+                               ptr->state = CON_EXECUTING;
+                       }
                }
                end_critical_section(S_SESSION_TABLE);
 
-               tv.tv_sec = 1;          /* wake up every second if no input */
-               tv.tv_usec = 0;
+               if (bind_me) goto SKIP_SELECT;
+
+               /* If we got this far, it means that there are no sessions
+                * which a previous thread marked for attention, so we go
+                * ahead and get ready to select().
+                */
 
-               do_select:
-               if (!time_to_die)
+               /* First, add the various master sockets to the fdset. */
+               for (serviceptr = ServiceHookTable; serviceptr != NULL;
+               serviceptr = serviceptr->next ) {
+                       m = serviceptr->msock;
+                       FD_SET(m, &readfds);
+                       if (m > highest) {
+                               highest = m;
+                       }
+               }
+
+               if (!time_to_die) {
+                       tv.tv_sec = 1;          /* wake up every second if no input */
+                       tv.tv_usec = 0;
                        retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+               }
                else {
-                       end_critical_section(S_I_WANNA_SELECT);
                        break;
                }
 
@@ -995,6 +951,11 @@ SETUP_FD:  memcpy(&readfds, &masterfds, sizeof masterfds);
                 * First, check for an error or exit condition.
                 */
                if (retval < 0) {
+                       if (errno == EBADF) {
+                               lprintf(CTDL_NOTICE, "select() failed: (%s)\n",
+                                       strerror(errno));
+                               goto do_select;
+                       }
                        if (errno != EINTR) {
                                lprintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno));
                                time_to_die = 1;
@@ -1029,6 +990,8 @@ SETUP_FD:  memcpy(&readfds, &masterfds, sizeof masterfds);
                                        con->client_socket = ssock;
                                        con->h_command_function =
                                                serviceptr->h_command_function;
+                                       con->h_async_function =
+                                               serviceptr->h_async_function;
 
                                        /* Determine whether local socket */
                                        if (serviceptr->sockpath != NULL)
@@ -1045,79 +1008,62 @@ SETUP_FD:       memcpy(&readfds, &masterfds, sizeof masterfds);
                                        serviceptr->h_greeting_function();
                                        become_session(NULL);
                                        con->state = CON_IDLE;
-                                       goto SETUP_FD;
+                                       goto do_select;
                                }
                        }
                }
 
-               /* If the rescan pipe went active, someone is telling this
-                * thread that the &readfds needs to be refreshed with more
-                * current data.
-                */
                if (time_to_die) {
-                       end_critical_section(S_I_WANNA_SELECT);
                        break;
                }
 
-               if (FD_ISSET(rescan[0], &readfds)) {
-                       read(rescan[0], &junk, 1);
-                       goto SETUP_FD;
-               }
-
                /* It must be a client socket.  Find a context that has data
-                * waiting on its socket *and* is in the CON_IDLE state.
+                * waiting on its socket *and* is in the CON_IDLE state.  Any
+                * active sockets other than our chosen one are marked as
+                * CON_READY so the next thread that comes around can just bind
+                * to one without having to select() again.
                 */
-               else {
-                       bind_me = NULL;
-                       begin_critical_section(S_SESSION_TABLE);
-                       /*
-                        * We start where we left off.  If we get to the end
-                        * we'll start from the beginning again, then give up
-                        * if we still don't find anything.  This ensures
-                        * that all contexts get a more-or-less equal chance
-                        * to run. And yes, I did add a goto to the code. -IO
-                        */
-find_session:          if (next_session == NULL)
-                               next_session = ContextList;
-                       for (ptr = next_session;
-                           ( (ptr != NULL) && (bind_me == NULL) );
-                           ptr = ptr->next) {
-                               if ( (FD_ISSET(ptr->client_socket, &readfds))
-                                  && (ptr->state == CON_IDLE) ) {
-                                       bind_me = ptr;
+               begin_critical_section(S_SESSION_TABLE);
+               for (ptr = ContextList; ptr != NULL; ptr = ptr->next) {
+                       if ( (FD_ISSET(ptr->client_socket, &readfds))
+                          && (ptr->state != CON_EXECUTING) ) {
+                               ptr->input_waiting = 1;
+                               if (!bind_me) {
+                                       bind_me = ptr;  /* I choose you! */
+                                       bind_me->state = CON_EXECUTING;
+                               }
+                               else {
+                                       ptr->state = CON_READY;
                                }
                        }
-                       if (bind_me != NULL) {
-                               /* Found one.  Stake a claim to it before
-                                * letting anyone else touch the context list.
-                                */
-                               bind_me->state = CON_EXECUTING;
-                               next_session = bind_me->next;
-                       } else if (next_session == ContextList) {
-                               next_session = NULL;
-                       }
-                       if (bind_me == NULL && next_session != NULL) {
-                               next_session = NULL;
-                               goto find_session;
-                       }
+               }
+               end_critical_section(S_SESSION_TABLE);
 
-                       end_critical_section(S_SESSION_TABLE);
-                       end_critical_section(S_I_WANNA_SELECT);
+SKIP_SELECT:
+               /* We're bound to a session */
+               if (bind_me != NULL) {
+                       become_session(bind_me);
 
-                       /* We're bound to a session, now do *one* command */
-                       if (bind_me != NULL) {
-                               become_session(bind_me);
+                       /* If the client has sent a command, execute it. */
+                       if (CC->input_waiting) {
                                CC->h_command_function();
-                               become_session(NULL);
-                               bind_me->state = CON_IDLE;
-                               if (bind_me->kill_me == 1) {
-                                       RemoveContext(bind_me);
-                               } 
-                               write(rescan[1], &junk, 1);
+                               CC->input_waiting = 0;
                        }
 
+                       /* If there are asynchronous messages waiting and the
+                        * client supports it, do those now */
+                       if ((CC->is_async) && (CC->async_waiting)
+                          && (CC->h_async_function != NULL)) {
+                               CC->h_async_function();
+                               CC->async_waiting = 0;
+                       }
+                       
+                       force_purge = CC->kill_me;
+                       become_session(NULL);
+                       bind_me->state = CON_IDLE;
                }
-               dead_session_purge();
+
+               dead_session_purge(force_purge);
                do_housekeeping();
                check_sched_shutdown();
        }
@@ -1271,7 +1217,8 @@ void dump_heap(void) {
        struct igheap *thisheap;
 
        for (thisheap = igheap; thisheap != NULL; thisheap = thisheap->next) {
-               lprintf(CTDL_DEBUG, "%30s : %d\n", thisheap->file, thisheap->line);
+               lprintf(CTDL_CRIT, "UNFREED: %30s : %d\n",
+                       thisheap->file, thisheap->line);
        }
 }