* more merges
authorArt Cancro <ajc@citadel.org>
Tue, 9 Jan 2001 06:05:10 +0000 (06:05 +0000)
committerArt Cancro <ajc@citadel.org>
Tue, 9 Jan 2001 06:05:10 +0000 (06:05 +0000)
citadel/database.h
citadel/database_sleepycat.c
citadel/server.h
citadel/sysdep.c
citadel/sysdep_decls.h

index 399f2013a308a84bf00a84535d09c4eac1c40f97..3df6edf62bfcbcac3ed75e09234f56d3e1cc134d 100644 (file)
@@ -10,3 +10,7 @@ void cdb_rewind (int cdb);
 struct cdbdata *cdb_next_item (int cdb);
 void cdb_begin_transaction(void);
 void cdb_end_transaction(void);
+void cdb_allocate_tsd(void);
+void cdb_free_tsd(void);
+void cdb_release_handles(void);
+
index d5c0c4e4948fd70e2473337cf637cd34ec7b9468..82d886e8989a4fd994a0641585e1b7d333c3751b 100644 (file)
@@ -170,7 +170,7 @@ void open_databases(void)
 {
        int ret;
        int i;
-       char dbfilename[256];
+       char dbfilename[SIZ];
        u_int32_t flags = 0;
 
        lprintf(9, "open_databases() starting\n");
index 55d81c747df76c1639e6a1822a2cef5e23d4147a..bfb95ccba49bfb391d78ba3f48efa87786df422e 100644 (file)
@@ -174,6 +174,7 @@ enum {
        S_SUPPMSGMAIN,
        S_I_WANNA_SELECT,
        S_CONFIG,
+       S_WORKER_LIST,
        MAX_SEMAPHORES
 };
 
index 107d40df57e174f8a79c204ba88ac2a693885252..74a71d63b9c3507987d0fe90c8097076e565cfd6 100644 (file)
@@ -72,7 +72,7 @@ int verbosity = DEFAULT_VERBOSITY;            /* Logging level */
 struct CitContext masterCC;
 int rescan[2];                                 /* The Rescan Pipe */
 time_t last_purge = 0;                         /* Last dead session purge */
-int num_threads = 0;                           /* Current number of threads */
+static int num_threads = 0;                    /* Current number of threads */
 int num_sessions = 0;                          /* Current number of sessions */
 
 fd_set masterfds;                              /* Master sockets etc. */
@@ -80,6 +80,8 @@ int masterhighest;
 
 time_t last_timer = 0L;                                /* Last timer hook processing */
 
+static pthread_t initial_thread;               /* tid for main() thread */
+
 
 /*
  * lprintf()  ...   Write logging information
@@ -194,7 +196,7 @@ void dump_tracked() {
  * that it's time to call master_cleanup() and exit.
  */
 
-static volatile int time_to_die = 0;
+volatile int time_to_die = 0;
 
 static RETSIGTYPE signal_cleanup(int signum) {
        time_to_die = 1;
@@ -712,6 +714,37 @@ int convert_login(char NameToConvert[]) {
        }
 }
 
+static struct worker_node {
+       pthread_t tid;
+       struct worker_node *next;
+} *worker_list = NULL;
+
+
+/*
+ * create a worker thread. this function must always be called from within
+ * an S_WORKER_LIST critical section!
+ */
+static void create_worker(void) {
+       int ret;
+       struct worker_node *n = mallok(sizeof *n);
+
+       if (n == NULL) {
+               lprintf(1, "can't allocate worker_node, exiting\n");
+               time_to_die = -1;
+               return;
+       }
+
+       if ((ret = pthread_create(&n->tid, NULL, worker_thread, NULL) != 0))
+       {
+
+               lprintf(1, "Can't create worker thread: %s\n",
+                       strerror(ret));
+       }
+
+       n->next = worker_list;
+       worker_list = n;
+}
+
 
 
 /*
@@ -725,8 +758,8 @@ int convert_login(char NameToConvert[]) {
  */
 void dead_session_purge(void) {
        struct CitContext *ptr, *rem;
-        pthread_attr_t attr;
-       pthread_t newthread;
+       struct worker_node **node, *tmp;
+       pthread_t self;
 
        if ( (time(NULL) - last_purge) < 5 ) return;    /* Too soon, go away */
        time(&last_purge);
@@ -756,22 +789,38 @@ void dead_session_purge(void) {
         * an action is appropriate.
         */
 
+       self = pthread_self();
+
        if ( (num_sessions > num_threads)
           && (num_threads < config.c_max_workers) ) {
-
-               pthread_attr_init(&attr);
-                       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-               if (pthread_create(&newthread, &attr,
-                  (void* (*)(void*)) worker_thread, NULL) != 0) {
-                       lprintf(1, "Can't create worker thead: %s\n",
-                       strerror(errno));
-               }
-
+               begin_critical_section(S_WORKER_LIST);
+               create_worker();
+               end_critical_section(S_WORKER_LIST);
        }
        
+       /* don't let the initial thread die since it's responsible for
+          waiting for all the other threads to terminate. */
        else if ( (num_sessions < num_threads)
-          && (num_threads > config.c_min_workers) ) {
+          && (num_threads > config.c_min_workers)
+          && (self != initial_thread) ) {
+               cdb_free_tsd();
+               begin_critical_section(S_WORKER_LIST);
                --num_threads;
+
+               /* we're exiting before server shutdown... unlink ourself from
+                  the worker list and detach our thread to avoid memory leaks
+                */
+
+               for (node = &worker_list; *node != NULL; node = &(*node)->next)
+                       if ((*node)->tid == self) {
+                               tmp = *node;
+                               *node = (*node)->next;
+                               phree(tmp);
+                               break;
+                       }
+
+               pthread_detach(self);
+               end_critical_section(S_WORKER_LIST);
                pthread_exit(NULL);
        }
 
@@ -848,13 +897,12 @@ void init_master_fdset(void) {
  */
 int main(int argc, char **argv)
 {
-       pthread_t WorkerThread; /* Thread descriptor */
-        pthread_attr_t attr;           /* Thread attributes */
        char tracefile[128];            /* Name of file to log traces to */
        int a, i;                       /* General-purpose variables */
        struct passwd *pw;
        int drop_root_perms = 1;
        char *moddir;
+       struct worker_node *wnp;
         
        /* specify default port name and trace file */
        strcpy(tracefile, "");
@@ -995,25 +1043,39 @@ int main(int argc, char **argv)
 
        /* We want to check for idle sessions once per minute */
        CtdlRegisterSessionHook(terminate_idle_sessions, EVT_TIMER);
-       
+
        /*
         * Now create a bunch of worker threads.
         */
+       lprintf(9, "Starting %d worker threads\n", config.c_min_workers-1);
+       begin_critical_section(S_WORKER_LIST);
        for (i=0; i<(config.c_min_workers-1); ++i) {
-               lprintf(9, "Creating worker thread %d\n", i);
-               pthread_attr_init(&attr);
-                       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-               if (pthread_create(&WorkerThread, &attr,
-                  (void* (*)(void*)) worker_thread, NULL) != 0) {
-                       lprintf(1, "Can't create worker thead: %s\n",
-                       strerror(errno));
-               }
+               create_worker();
        }
-
+       end_critical_section(S_WORKER_LIST);
 
        /* Now this thread can become a worker as well. */
-       lprintf(9, "Original thread entering worker loop\n");
-       worker_thread();
+       initial_thread = pthread_self();
+       worker_thread(NULL);
+
+       /* Server is exiting. Wait for workers to shutdown. */
+       lprintf(7, "Waiting for worker threads to shut down\n");
+
+       begin_critical_section(S_WORKER_LIST);
+       while (worker_list != NULL) {
+               wnp = worker_list;
+               worker_list = wnp->next;
+
+               /* avoid deadlock with an exiting thread */
+               end_critical_section(S_WORKER_LIST);
+               if ((i = pthread_join(wnp->tid, NULL)))
+                       lprintf(1, "pthread_join: %s\n", strerror(i));
+               phree(wnp);
+               begin_critical_section(S_WORKER_LIST);
+       }
+       end_critical_section(S_WORKER_LIST);
+
+       master_cleanup();
 
        return(0);
 }
@@ -1031,7 +1093,7 @@ inline void become_session(struct CitContext *which_con) {
 /* 
  * This loop just keeps going and going and going...
  */    
-void worker_thread(void) {
+void *worker_thread(void *arg) {
        int i;
        char junk;
        int highest;
@@ -1046,7 +1108,9 @@ void worker_thread(void) {
        int ssock;                      /* Descriptor for client socket */
        struct timeval tv;
 
-       ++num_threads;
+       num_threads++;
+
+       cdb_allocate_tsd();
 
        while (!time_to_die) {
 
@@ -1060,6 +1124,11 @@ void worker_thread(void) {
                 * context we want and go on our merry way.
                 */
 
+               /* make doubly sure we're not holding any stale db handles
+                * which might cause a deadlock.
+                */
+               cdb_release_handles();
+
                begin_critical_section(S_I_WANNA_SELECT);
 SETUP_FD:      memcpy(&readfds, &masterfds, sizeof masterfds);
                highest = masterhighest;
@@ -1073,17 +1142,26 @@ SETUP_FD:       memcpy(&readfds, &masterfds, sizeof masterfds);
                }
                end_critical_section(S_SESSION_TABLE);
 
-               tv.tv_sec = 60;         /* wake up every minute if no input */
+               tv.tv_sec = 1;          /* wake up every second if no input */
                tv.tv_usec = 0;
-               retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+
+               do_select:
+               if (!time_to_die)
+                       retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+               else {
+                       end_critical_section(S_I_WANNA_SELECT);
+                       break;
+               }
 
                /* Now figure out who made this select() unblock.
                 * First, check for an error or exit condition.
                 */
                if (retval < 0) {
-                       end_critical_section(S_I_WANNA_SELECT);
-                       lprintf(9, "Exiting (%s)\n", strerror(errno));
-                       time_to_die = 1;
+                       if (errno != EINTR) {
+                               lprintf(9, "Exiting (%s)\n", strerror(errno));
+                               time_to_die = 1;
+                       } else if (!time_to_die)
+                               goto do_select;
                }
 
                /* Next, check to see if it's a new client connecting
@@ -1139,8 +1217,10 @@ SETUP_FD:        memcpy(&readfds, &masterfds, sizeof masterfds);
                 * thread that the &readfds needs to be refreshed with more
                 * current data.
                 */
-               if (time_to_die)
+               if (time_to_die) {
+                       end_critical_section(S_I_WANNA_SELECT);
                        break;
+               }
 
                if (FD_ISSET(rescan[0], &readfds)) {
                        read(rescan[0], &junk, 1);
@@ -1187,6 +1267,7 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds);
                dead_session_purge();
                if ((time(NULL) - last_timer) > 60L) {
                        last_timer = time(NULL);
+                       cdb_release_handles(); /* suggested by Justin Case */
                        PerformSessionHooks(EVT_TIMER);
                }
 
@@ -1194,8 +1275,9 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds);
        }
 
        /* If control reaches this point, the server is shutting down */        
-       master_cleanup();
        --num_threads;
-       pthread_exit(NULL);
+       return NULL;
 }
 
+
+
index 13d5c931fd240472b716ae1635f3ad9513f8d9fe..1398f30b05bc0f8b8aecaf15a37ac77ba4d39b88 100644 (file)
@@ -19,8 +19,9 @@ void *sd_context_loop (struct CitContext *con);
 void start_daemon (int do_close_stdio);
 void cmd_nset (char *cmdbuf);
 int convert_login (char *NameToConvert);
-void worker_thread (void);
+void *worker_thread (void *arg);
 inline void become_session(struct CitContext *which_con);
 void CtdlRedirectOutput(FILE *fp, int sock);
 
 extern int num_sessions;
+extern int time_to_die;