X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fsysdep.c;h=655bb7249286fc26975c3acd563f76b0dbb5e663;hb=fe4a9ff9d1b33197fb5dfc45567b98c63049d7b6;hp=35a3d8f43797bd95671f9b60d92fa0a075efa258;hpb=a79391ff6cdd28c07f85c91a375b1b15ad7bd2e4;p=citadel.git diff --git a/citadel/sysdep.c b/citadel/sysdep.c index 35a3d8f43..655bb7249 100644 --- a/citadel/sysdep.c +++ b/citadel/sysdep.c @@ -1,14 +1,15 @@ /* + * $Id$ + * * Citadel/UX "system dependent" stuff. * See copyright.txt for copyright information. * - * $Id$ - * - * Here's where we (hopefully) have all the parts of the Citadel server that + * Here's where we (hopefully) have most parts of the Citadel server that * would need to be altered to run the server in a non-POSIX environment. * - * Eventually we'll try porting to a different platform and either have - * multiple variants of this file or simply load it up with #ifdefs. + * If we ever port to a different platform and either have multiple + * variants of this file or simply load it up with #ifdefs. + * */ @@ -20,12 +21,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -69,31 +72,51 @@ int verbosity = DEFAULT_VERBOSITY; /* Logging level */ struct CitContext masterCC; int rescan[2]; /* The Rescan Pipe */ time_t last_purge = 0; /* Last dead session purge */ -int num_threads = 0; /* Current number of threads */ +static int num_threads = 0; /* Current number of threads */ int num_sessions = 0; /* Current number of sessions */ fd_set masterfds; /* Master sockets etc. */ int masterhighest; +time_t last_timer = 0L; /* Last timer hook processing */ + +static pthread_t initial_thread; /* tid for main() thread */ + /* * lprintf() ... Write logging information + * + * Note: the variable "buf" below needs to be large enough to handle any + * log data sent through this function. BE CAREFUL! */ void lprintf(int loglevel, const char *format, ...) { va_list arg_ptr; - char buf[512]; + char buf[4096]; va_start(arg_ptr, format); vsprintf(buf, format, arg_ptr); va_end(arg_ptr); if (loglevel <= verbosity) { - fprintf(stderr, "%s", buf); + struct timeval tv; + struct tm *tim; + + gettimeofday(&tv, NULL); + tim = localtime(&(tv.tv_sec)); + /* + * Log provides millisecond accuracy. If you need + * microsecond accuracy and your OS supports it, change + * %03ld to %06ld and remove " / 1000" after tv.tv_usec. + */ + fprintf(stderr, "%04d/%02d/%02d %2d:%02d:%02d.%03ld %s", + tim->tm_year + 1900, tim->tm_mon + 1, tim->tm_mday, + tim->tm_hour, tim->tm_min, tim->tm_sec, + tv.tv_usec / 1000, buf); fflush(stderr); - } + } PerformLogHooks(loglevel, buf); - } +} @@ -116,7 +139,7 @@ void *tracked_malloc(size_t tsize, char *tfile, int tline) { hptr->h_ptr = ptr; heap = hptr; return ptr; - } +} char *tracked_strdup(const char *orig, char *tfile, int tline) { char *s; @@ -135,19 +158,19 @@ void tracked_free(void *ptr) { hptr = heap->next; free(heap); heap = hptr; - } + } else { for (hptr=heap; hptr->next!=NULL; hptr=hptr->next) { if (hptr->next->h_ptr == ptr) { freeme = hptr->next; hptr->next = hptr->next->next; free(freeme); - } } } + } free(ptr); - } +} void *tracked_realloc(void *ptr, size_t size) { void *newptr; @@ -157,10 +180,10 @@ void *tracked_realloc(void *ptr, size_t size) { for (hptr=heap; hptr!=NULL; hptr=hptr->next) { if (hptr->h_ptr == ptr) hptr->h_ptr = newptr; - } + } return newptr; - } +} void dump_tracked() { @@ -170,18 +193,18 @@ void dump_tracked() { for (hptr=heap; hptr!=NULL; hptr=hptr->next) { cprintf("%20s %5d\n", hptr->h_file, hptr->h_line); - } + } #ifdef __GNUC__ malloc_stats(); #endif cprintf("000\n"); - } +} #endif /* - * we used to use master_cleanup() as a signal handler to shut down the server. + * We used to use master_cleanup() as a signal handler to shut down the server. * however, master_cleanup() and the functions it calls do some things that * aren't such a good idea to do from a signal handler: acquiring mutexes, * playing with signal masks on BSDI systems, etc. so instead we install the @@ -189,7 +212,7 @@ void dump_tracked() { * that it's time to call master_cleanup() and exit. */ -static volatile int time_to_die = 0; +volatile int time_to_die = 0; static RETSIGTYPE signal_cleanup(int signum) { time_to_die = 1; @@ -210,7 +233,8 @@ void init_sysdep(void) { /* * Set up a place to put thread-specific data. * We only need a single pointer per thread - it points to the - * thread's CitContext structure in the ContextList linked list. + * CitContext structure (in the ContextList linked list) of the + * session to which the calling thread is currently bound. */ if (pthread_key_create(&MyConKey, NULL) != 0) { lprintf(1, "Can't create TSD key!! %s\n", strerror(errno)); @@ -239,6 +263,10 @@ void init_sysdep(void) { */ void begin_critical_section(int which_one) { + /* lprintf(9, "begin_critical_section(%d)\n", which_one); */ + /* ensure nobody ever tries to do a critical section within a + transaction; this could lead to deadlock. */ + cdb_check_handles(); pthread_mutex_lock(&Critters[which_one]); } @@ -247,6 +275,7 @@ void begin_critical_section(int which_one) */ void end_critical_section(int which_one) { + /* lprintf(9, "end_critical_section(%d)\n", which_one); */ pthread_mutex_unlock(&Critters[which_one]); } @@ -255,44 +284,94 @@ void end_critical_section(int which_one) /* * This is a generic function to set up a master socket for listening on * a TCP port. The server shuts down if the bind fails. + * */ int ig_tcp_server(int port_number, int queue_len) { struct sockaddr_in sin; int s, i; + int actual_queue_len; + + actual_queue_len = queue_len; + if (actual_queue_len < 5) actual_queue_len = 5; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; - - if (port_number == 0) { - lprintf(1, "citserver: illegal port number specified\n"); - return(-1); - } - sin.sin_port = htons((u_short)port_number); - s = socket(PF_INET, SOCK_STREAM, (getprotobyname("tcp")->p_proto)); + s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s < 0) { lprintf(1, "citserver: Can't create a socket: %s\n", strerror(errno)); return(-1); } - /* Set the SO_REUSEADDR socket option, because it makes sense. */ i = 1; setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)); if (bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) { - lprintf(1, "citserver: Can't bind: %s\n", strerror(errno)); + lprintf(1, "citserver: Can't bind: %s\n", + strerror(errno)); + close(s); + return(-1); + } + + if (listen(s, actual_queue_len) < 0) { + lprintf(1, "citserver: Can't listen: %s\n", strerror(errno)); + close(s); + return(-1); + } + + return(s); +} + + + +/* + * Create a Unix domain socket and listen on it + */ +int ig_uds_server(char *sockpath, int queue_len) +{ + struct sockaddr_un addr; + int s; + int i; + int actual_queue_len; + + actual_queue_len = queue_len; + if (actual_queue_len < 5) actual_queue_len = 5; + + i = unlink(sockpath); + if (i != 0) if (errno != ENOENT) { + lprintf(1, "citserver: can't unlink %s: %s\n", + sockpath, strerror(errno)); + return(-1); + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + safestrncpy(addr.sun_path, sockpath, sizeof addr.sun_path); + + s = socket(AF_UNIX, SOCK_STREAM, 0); + if (s < 0) { + lprintf(1, "citserver: Can't create a socket: %s\n", + strerror(errno)); + return(-1); + } + + if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + lprintf(1, "citserver: Can't bind: %s\n", + strerror(errno)); return(-1); } - if (listen(s, queue_len) < 0) { + if (listen(s, actual_queue_len) < 0) { lprintf(1, "citserver: Can't listen: %s\n", strerror(errno)); return(-1); } + chmod(sockpath, 0777); return(s); } @@ -312,7 +391,11 @@ struct CitContext *MyContext(void) { /* - * Initialize a new context and place it in the list. + * Initialize a new context and place it in the list. The session number + * used to be the PID (which is why it's called cs_pid), but that was when we + * had one process per session. Now we just assign them sequentially, starting + * at 1 (don't change it to 0 because masterCC uses 0) and re-using them when + * sessions terminate. */ struct CitContext *CreateNewContext(void) { struct CitContext *me, *ptr; @@ -363,13 +446,28 @@ void client_write(char *buf, int nbytes) { int bytes_written = 0; int retval; + int sock; + + + if (CC->redirect_fp != NULL) { + fwrite(buf, nbytes, 1, CC->redirect_fp); + return; + } + + if (CC->redirect_sock > 0) { + sock = CC->redirect_sock; /* and continue below... */ + } + else { + sock = CC->client_socket; + } + while (bytes_written < nbytes) { - retval = write(CC->client_socket, &buf[bytes_written], + retval = write(sock, &buf[bytes_written], nbytes - bytes_written); if (retval < 1) { lprintf(2, "client_write() failed: %s\n", strerror(errno)); - CC->kill_me = 1; + if (sock == CC->client_socket) CC->kill_me = 1; return; } bytes_written = bytes_written + retval; @@ -384,7 +482,7 @@ void client_write(char *buf, int nbytes) */ void cprintf(const char *format, ...) { va_list arg_ptr; - char buf[256]; + char buf[SIZ]; va_start(arg_ptr, format); if (vsnprintf(buf, sizeof buf, format, arg_ptr) == -1) @@ -459,13 +557,13 @@ int client_gets(char *buf) */ for (i = 0;;i++) { retval = client_read(&buf[i], 1); - if (retval != 1 || buf[i] == '\n' || i == 255) + if (retval != 1 || buf[i] == '\n' || i == (SIZ-1)) break; } /* If we got a long line, discard characters until the newline. */ - if (i == 255) + if (i == (SIZ-1)) while (buf[i] != '\n' && retval == 1) retval = client_read(&buf[i], 1); @@ -474,6 +572,7 @@ int client_gets(char *buf) buf[i] = 0; while ((strlen(buf)>0)&&(!isprint(buf[strlen(buf)-1]))) buf[strlen(buf)-1] = 0; + if (retval < 0) strcpy(buf, "000"); return(retval); } @@ -483,7 +582,29 @@ int client_gets(char *buf) * The system-dependent part of master_cleanup() - close the master socket. */ void sysdep_master_cleanup(void) { - /* FIX close all protocol master sockets here */ + struct ServiceFunctionHook *serviceptr; + + /* + * close all protocol master sockets + */ + for (serviceptr = ServiceHookTable; serviceptr != NULL; + serviceptr = serviceptr->next ) { + + if (serviceptr->tcp_port > 0) + lprintf(3, "Closing listener on port %d\n", + serviceptr->tcp_port); + + if (serviceptr->sockpath != NULL) + lprintf(3, "Closing listener on '%s'\n", + serviceptr->sockpath); + + close(serviceptr->msock); + + /* If it's a Unix domain socket, remove the file. */ + if (serviceptr->sockpath != NULL) { + unlink(serviceptr->sockpath); + } + } } @@ -536,7 +657,7 @@ void cmd_nset(char *cmdbuf) FILE *netsetup; int ch; int a, b; - char netsetup_args[3][256]; + char netsetup_args[3][SIZ]; if (CC->usersupp.axlevel < 6) { cprintf("%d Higher access required.\n", @@ -612,6 +733,37 @@ int convert_login(char NameToConvert[]) { } } +static struct worker_node { + pthread_t tid; + struct worker_node *next; +} *worker_list = NULL; + + +/* + * create a worker thread. this function must always be called from within + * an S_WORKER_LIST critical section! + */ +static void create_worker(void) { + int ret; + struct worker_node *n = mallok(sizeof *n); + + if (n == NULL) { + lprintf(1, "can't allocate worker_node, exiting\n"); + time_to_die = -1; + return; + } + + if ((ret = pthread_create(&n->tid, NULL, worker_thread, NULL) != 0)) + { + + lprintf(1, "Can't create worker thread: %s\n", + strerror(ret)); + } + + n->next = worker_list; + worker_list = n; +} + /* @@ -625,8 +777,8 @@ int convert_login(char NameToConvert[]) { */ void dead_session_purge(void) { struct CitContext *ptr, *rem; - pthread_attr_t attr; - pthread_t newthread; + struct worker_node **node, *tmp; + pthread_t self; if ( (time(NULL) - last_purge) < 5 ) return; /* Too soon, go away */ time(&last_purge); @@ -656,47 +808,127 @@ void dead_session_purge(void) { * an action is appropriate. */ + self = pthread_self(); + if ( (num_sessions > num_threads) && (num_threads < config.c_max_workers) ) { - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&newthread, &attr, - (void* (*)(void*)) worker_thread, NULL) != 0) { - lprintf(1, "Can't create worker thead: %s\n", - strerror(errno)); - } - + begin_critical_section(S_WORKER_LIST); + create_worker(); + end_critical_section(S_WORKER_LIST); } + /* don't let the initial thread die since it's responsible for + waiting for all the other threads to terminate. */ else if ( (num_sessions < num_threads) - && (num_threads > config.c_min_workers) ) { + && (num_threads > config.c_min_workers) + && (self != initial_thread) ) { + cdb_free_tsd(); + begin_critical_section(S_WORKER_LIST); --num_threads; + + /* we're exiting before server shutdown... unlink ourself from + the worker list and detach our thread to avoid memory leaks + */ + + for (node = &worker_list; *node != NULL; node = &(*node)->next) + if ((*node)->tid == self) { + tmp = *node; + *node = (*node)->next; + phree(tmp); + break; + } + + pthread_detach(self); + end_critical_section(S_WORKER_LIST); pthread_exit(NULL); } } - + + + +/* + * Redirect a session's output to a file or socket. + * This function may be called with a file handle *or* a socket (but not + * both). Call with neither to return output to its normal client socket. + */ +void CtdlRedirectOutput(FILE *fp, int sock) { + + if (fp != NULL) CC->redirect_fp = fp; + else CC->redirect_fp = NULL; + + if (sock > 0) CC->redirect_sock = sock; + else CC->redirect_sock = (-1); + +} + + +/* + * masterCC is the context we use when not attached to a session. This + * function initializes it. + */ +void InitializeMasterCC(void) { + memset(&masterCC, 0, sizeof(struct CitContext)); + masterCC.internal_pgm = 1; + masterCC.cs_pid = 0; +} + + + +/* + * Set up a fd_set containing all the master sockets to which we + * always listen. It's computationally less expensive to just copy + * this to a local fd_set when starting a new select() and then add + * the client sockets than it is to initialize a new one and then + * figure out what to put there. + */ +void init_master_fdset(void) { + struct ServiceFunctionHook *serviceptr; + int m; + + lprintf(9, "Initializing master fdset\n"); + + FD_ZERO(&masterfds); + masterhighest = 0; + + lprintf(9, "Will listen on rescan pipe %d\n", rescan[0]); + FD_SET(rescan[0], &masterfds); + if (rescan[0] > masterhighest) masterhighest = rescan[0]; + + for (serviceptr = ServiceHookTable; serviceptr != NULL; + serviceptr = serviceptr->next ) { + m = serviceptr->msock; + lprintf(9, "Will listen on master socket %d\n", m); + FD_SET(m, &masterfds); + if (m > masterhighest) { + masterhighest = m; + } + } + lprintf(9, "masterhighest = %d\n", masterhighest); +} + + /* * Here's where it all begins. */ int main(int argc, char **argv) { - pthread_t HousekeepingThread; /* Thread descriptor */ - pthread_attr_t attr; /* Thread attributes */ char tracefile[128]; /* Name of file to log traces to */ int a, i; /* General-purpose variables */ struct passwd *pw; int drop_root_perms = 1; char *moddir; - struct ServiceFunctionHook *serviceptr; + struct worker_node *wnp; /* specify default port name and trace file */ strcpy(tracefile, ""); + /* initialize the master context */ + InitializeMasterCC(); + /* parse command-line arguments */ for (a=1; a masterhighest) masterhighest = rescan[0]; - - for (serviceptr = ServiceHookTable; serviceptr != NULL; - serviceptr = serviceptr->next ) { - serviceptr->msock = ig_tcp_server( - serviceptr->tcp_port, config.c_maxsessions); - if (serviceptr->msock >= 0) { - FD_SET(serviceptr->msock, &masterfds); - if (serviceptr->msock > masterhighest) - masterhighest = serviceptr->msock; - lprintf(7, "Bound to port %-5d (socket %d)\n", - serviceptr->tcp_port, - serviceptr->msock); - } - else { - lprintf(1, "Unable to bind to port %d\n", - serviceptr->tcp_port); - } - } - + init_master_fdset(); /* * Now that we've bound the sockets, change to the BBS user id and its @@ -847,41 +1060,48 @@ int main(int argc, char **argv) } } - /* - * Create the housekeeper thread - */ - lprintf(7, "Starting housekeeper thread\n"); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&HousekeepingThread, &attr, - (void* (*)(void*)) housekeeping_loop, NULL) != 0) { - lprintf(1, "Can't create housekeeping thead: %s\n", - strerror(errno)); - } - + /* We want to check for idle sessions once per minute */ + CtdlRegisterSessionHook(terminate_idle_sessions, EVT_TIMER); /* * Now create a bunch of worker threads. */ + lprintf(9, "Starting %d worker threads\n", config.c_min_workers-1); + begin_critical_section(S_WORKER_LIST); for (i=0; i<(config.c_min_workers-1); ++i) { - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&HousekeepingThread, &attr, - (void* (*)(void*)) worker_thread, NULL) != 0) { - lprintf(1, "Can't create worker thead: %s\n", - strerror(errno)); - } + create_worker(); } + end_critical_section(S_WORKER_LIST); /* Now this thread can become a worker as well. */ - worker_thread(); + initial_thread = pthread_self(); + worker_thread(NULL); + + /* Server is exiting. Wait for workers to shutdown. */ + lprintf(7, "Waiting for worker threads to shut down\n"); + + begin_critical_section(S_WORKER_LIST); + while (worker_list != NULL) { + wnp = worker_list; + worker_list = wnp->next; + + /* avoid deadlock with an exiting thread */ + end_critical_section(S_WORKER_LIST); + if ((i = pthread_join(wnp->tid, NULL))) + lprintf(1, "pthread_join: %s\n", strerror(i)); + phree(wnp); + begin_critical_section(S_WORKER_LIST); + } + end_critical_section(S_WORKER_LIST); + + master_cleanup(); return(0); } /* - * Bind a thread to a context. + * Bind a thread to a context. (It's inline merely to speed things up.) */ inline void become_session(struct CitContext *which_con) { pthread_setspecific(MyConKey, (void *)which_con ); @@ -892,7 +1112,7 @@ inline void become_session(struct CitContext *which_con) { /* * This loop just keeps going and going and going... */ -void worker_thread(void) { +void *worker_thread(void *arg) { int i; char junk; int highest; @@ -905,8 +1125,12 @@ void worker_thread(void) { struct sockaddr_in fsin; /* Data for master socket */ int alen; /* Data for master socket */ int ssock; /* Descriptor for client socket */ + struct timeval tv; + + num_threads++; + + cdb_allocate_tsd(); - ++num_threads; while (!time_to_die) { /* @@ -914,13 +1138,18 @@ void worker_thread(void) { * calling select() and then they'd all wake up at once. We * solve this problem by putting the select() in a critical * section, so only one thread has the opportunity to wake - * up. If we wake up on the master socket, create a new + * up. If we wake up on a master socket, create a new * session context; otherwise, just bind the thread to the * context we want and go on our merry way. */ + /* make doubly sure we're not holding any stale db handles + * which might cause a deadlock. + */ + cdb_check_handles(); + begin_critical_section(S_I_WANNA_SELECT); -SETUP_FD: memcpy(&readfds, &masterfds, sizeof(fd_set) ); +SETUP_FD: memcpy(&readfds, &masterfds, sizeof masterfds); highest = masterhighest; begin_critical_section(S_SESSION_TABLE); for (ptr = ContextList; ptr != NULL; ptr = ptr->next) { @@ -932,19 +1161,30 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof(fd_set) ); } end_critical_section(S_SESSION_TABLE); - retval = select(highest + 1, &readfds, NULL, NULL, NULL); + tv.tv_sec = 1; /* wake up every second if no input */ + tv.tv_usec = 0; + + do_select: + if (!time_to_die) + retval = select(highest + 1, &readfds, NULL, NULL, &tv); + else { + end_critical_section(S_I_WANNA_SELECT); + break; + } /* Now figure out who made this select() unblock. * First, check for an error or exit condition. */ if (retval < 0) { - end_critical_section(S_I_WANNA_SELECT); - lprintf(9, "Exiting (%s)\n", strerror(errno)); - time_to_die = 1; + if (errno != EINTR) { + lprintf(9, "Exiting (%s)\n", strerror(errno)); + time_to_die = 1; + } else if (!time_to_die) + goto do_select; } /* Next, check to see if it's a new client connecting - * on the master socket. + * on a master socket. */ else for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) { @@ -971,6 +1211,10 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof(fd_set) ); con->client_socket = ssock; con->h_command_function = serviceptr->h_command_function; + + /* Determine whether local socket */ + if (serviceptr->sockpath != NULL) + con->is_local_socket = 1; /* Set the SO_REUSEADDR socket option */ i = 1; @@ -992,7 +1236,12 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof(fd_set) ); * thread that the &readfds needs to be refreshed with more * current data. */ - if (!time_to_die) if (FD_ISSET(rescan[0], &readfds)) { + if (time_to_die) { + end_critical_section(S_I_WANNA_SELECT); + break; + } + + if (FD_ISSET(rescan[0], &readfds)) { read(rescan[0], &junk, 1); goto SETUP_FD; } @@ -1032,17 +1281,22 @@ SETUP_FD: memcpy(&readfds, &masterfds, sizeof(fd_set) ); } write(rescan[1], &junk, 1); } - else { - lprintf(9, "Thread found nothing to do!\n"); - } } dead_session_purge(); + if ((time(NULL) - last_timer) > 60L) { + last_timer = time(NULL); + cdb_check_handles(); /* suggested by Justin Case */ + PerformSessionHooks(EVT_TIMER); + } + + check_sched_shutdown(); } /* If control reaches this point, the server is shutting down */ - master_cleanup(); --num_threads; - pthread_exit(NULL); + return NULL; } + +