+
+
+
+
+/*
+ * Starting a new implimentation of a worker thread.
+ * This new implimentation will be faster and do more work per thread.
+ */
+
+/*
+ * Select on master socket.
+ * First worker thread in here acquires the lock and builds an FDSET of master sockets.
+ * then it goes into a loop selecting on the master sockets timing out every few milliseconds.
+ * If it times out it rebiulds its list and loops.
+ * If the select succeeds it creates a new context and returns.
+ * During this time the other workers are selecting on existing contexts or sleeping.
+ */
+void select_on_master(void)
+{
+ fd_set readfds;
+ struct ServiceFunctionHook *serviceptr;
+ int ssock; /* Descriptor for client socket */
+ int highest;
+ int m, i;
+ int retval = 0;
+ struct timeval tv;
+ struct CitContext *con;
+ const char *old_name;
+
+
+
+ old_name = CtdlThreadName("select_on_master");
+
+ /* Initialize the fdset. */
+ FD_ZERO(&readfds);
+ highest = 0;
+
+ /* First, add the various master sockets to the fdset. */
+ for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
+ m = serviceptr->msock;
+ FD_SET(m, &readfds);
+ if (m > highest) {
+ highest = m;
+ }
+ }
+
+ tv.tv_sec = 1; /* wake up every 1 sec if no input */
+ tv.tv_usec = 0;
+ retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv);
+
+ /* Select got an error or we are shutting down so get out */
+ if (retval == 0 || CtdlThreadCheckStop()) {
+ CtdlThreadName(old_name);
+ return;
+ }
+
+ /* Select says something happened on one of our master sockets so now we handle it */
+ for (serviceptr = ServiceHookTable; serviceptr != NULL; serviceptr = serviceptr->next ) {
+ if (FD_ISSET(serviceptr->msock, &readfds)) {
+ ssock = accept(serviceptr->msock, NULL, 0);
+ if (ssock >= 0) {
+ CtdlLogPrintf(CTDL_DEBUG, "New client socket %d\n", ssock);
+ /* The master socket is non-blocking but the client
+ * sockets need to be blocking, otherwise certain
+ * operations barf on FreeBSD. Not a fatal error.
+ */
+ if (fcntl(ssock, F_SETFL, 0) < 0) {
+ CtdlLogPrintf(CTDL_EMERG,
+ "citserver: Can't set socket to blocking: %s\n",
+ strerror(errno));
+ }
+
+ /* New context will be created already
+ * set up in the CON_EXECUTING state.
+ */
+ con = CreateNewContext();
+ CT->Context = con;
+
+ /* Assign our new socket number to it. */
+ con->client_socket = ssock;
+ con->h_command_function = serviceptr->h_command_function;
+ con->h_async_function = serviceptr->h_async_function;
+ con->ServiceName = serviceptr->ServiceName;
+ /* Determine whether it's a local socket */
+ if (serviceptr->sockpath != NULL)
+ con->is_local_socket = 1;
+
+ /* Set the SO_REUSEADDR socket option */
+ i = 1;
+ setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i));
+
+ become_session(con);
+ begin_session(con);
+ serviceptr->h_greeting_function();
+ become_session(NULL);
+ con->state = CON_IDLE;
+ break;
+ }
+ }
+ }
+
+ CtdlThreadName(old_name);
+}
+
+/*
+ * Select on client socket.
+ * First worker thread in here acquires the lock and builds an FDSET of client sockets.
+ * then it selects on the client sockets timing out after 1 second.
+ * If it times out the thread goes off to check on housekeeping etc.
+ * If the select succeeds the thread goes off to handle the client request.
+ * If the list of client connections is empty the threads all sleep for one second
+ */
+struct CitContext *select_on_client(void)
+{
+ fd_set readfds;
+ struct timeval tv;
+ int retval = 0;
+ int highest=0;
+ const char *old_name;
+
+
+ old_name = CtdlThreadName("select_on_client");
+
+ /* Initialise the fdset */
+ FD_ZERO(&readfds);
+ FD_SET(CT->Context->client_socket, &readfds);
+ highest = CT->Context->client_socket;
+ /* Now we can select on any connections that are waiting */
+
+ if (!CtdlThreadCheckStop())
+ {
+ tv.tv_sec = config.c_sleeping; /* wake up every second if no input */
+ tv.tv_usec = 0;
+ retval = select(highest + 1, &readfds, NULL, NULL, &tv);
+ }
+ else /* Shutting down? */
+ {
+ CtdlThreadName(old_name);
+ return(NULL);
+ }
+
+
+ /* Now figure out who made this select() unblock.
+ * First, check for an error or exit condition.
+ */
+ if (retval < 0) {
+ if (errno == EBADF) {
+ CtdlLogPrintf(CTDL_NOTICE, "select() failed: (%s)\n",
+ strerror(errno));
+ }
+ if (errno != EINTR) {
+ CtdlLogPrintf(CTDL_EMERG, "Exiting (%s)\n", strerror(errno));
+ CtdlThreadStopAll();
+ } else if (!CtdlThreadCheckStop()) {
+ CtdlLogPrintf(CTDL_DEBUG, "Un handled select failure.\n");
+ }
+ CtdlThreadName(old_name);
+ return NULL;
+ }
+ else if(retval == 0)
+ {
+ CtdlThreadName(old_name);
+ CT->Context->kill_me = 1;
+ CT->Context = NULL;
+ return CT->Context;
+ }
+
+ CT->Context->state = CON_EXECUTING;
+ CT->Context->input_waiting = 1;
+
+ CtdlThreadName(old_name);
+ return (CT->Context);
+}
+
+
+
+/*
+ * Do the worker threads work when needed
+ */
+int execute_session(struct CitContext *bind_me)
+{
+ int force_purge;
+
+ become_session(bind_me);
+
+ /* If the client has sent a command, execute it. */
+ if (CC->input_waiting) {
+ CC->h_command_function();
+ CC->input_waiting = 0;
+ }
+
+ /* If there are asynchronous messages waiting and the
+ * client supports it, do those now */
+ if ((CC->is_async) && (CC->async_waiting)
+ && (CC->h_async_function != NULL)) {
+ CC->h_async_function();
+ CC->async_waiting = 0;
+ }
+
+ force_purge = CC->kill_me;
+ if (force_purge)
+ CT->Context = NULL;
+ become_session(NULL);
+ bind_me->state = CON_IDLE;
+ return force_purge;
+}
+
+
+
+extern void dead_session_purge(int force);
+
+/*
+ * A new worker_thread loop.
+ */
+
+void *new_worker_thread(void *arg)
+{
+ struct CitContext *bind_me;
+ int force_purge;
+
+ while (!CtdlThreadCheckStop()) {
+
+ /* make doubly sure we're not holding any stale db handles
+ * which might cause a deadlock.
+ */
+ cdb_check_handles();
+ force_purge = 0;
+ bind_me = NULL; /* Which session shall we handle? */
+
+ if (CT->Context == NULL)
+ select_on_master();
+ if (CtdlThreadCheckStop())
+ break;
+
+ if (CT->Context)
+ bind_me = select_on_client();
+ if (CtdlThreadCheckStop())
+ break;
+
+ if (bind_me)
+ force_purge = execute_session(bind_me);
+
+ dead_session_purge(force_purge);
+ if (CtdlThreadCheckStop())
+ break;
+
+ do_housekeeping();
+ }
+ return NULL;
+}