From: Art Cancro Date: Sun, 20 Mar 2011 06:00:41 +0000 (-0400) Subject: Switched back to the old style thread architecture in preparation for eventual migrat... X-Git-Tag: v8.11~813 X-Git-Url: https://code.citadel.org/?p=citadel.git;a=commitdiff_plain;h=d04e1208cf6c71816adb600c6493c48c7b9610f1 Switched back to the old style thread architecture in preparation for eventual migration to libev --- diff --git a/citadel/citserver.c b/citadel/citserver.c index 75b23badb..cff393ba0 100644 --- a/citadel/citserver.c +++ b/citadel/citserver.c @@ -172,11 +172,6 @@ void master_startup(void) { syslog(LOG_INFO, "Opening databases\n"); open_databases(); - - ctdl_thread_internal_init_tsd(); - - CtdlThreadAllocTSD(); - check_ref_counts(); syslog(LOG_INFO, "Creating base rooms (if necessary)\n"); @@ -303,10 +298,10 @@ void cmd_info(char *cmdbuf) { cprintf("%s\n", config.c_default_cal_zone); - /* Output load averages */ - cprintf("%f\n", CtdlThreadLoadAvg); - cprintf("%f\n", CtdlThreadWorkerAvg); - cprintf("%d\n", CtdlThreadGetCount()); + /* thread load averages -- temporarily disabled during refactoring of this code */ + cprintf("0\n"); /* load average */ + cprintf("0\n"); /* worker average */ + cprintf("0\n"); /* thread count */ cprintf("1\n"); /* yes, Sieve mail filtering is supported */ cprintf("%d\n", config.c_enable_fulltext); @@ -1031,16 +1026,12 @@ void cmd_lout(char *argbuf) */ void do_command_loop(void) { char cmdbuf[SIZ]; - const char *old_name = NULL; - - old_name = CtdlThreadName("do_command_loop"); time(&CC->lastcmd); memset(cmdbuf, 0, sizeof cmdbuf); /* Clear it, just in case */ if (client_getln(cmdbuf, sizeof cmdbuf) < 1) { syslog(LOG_ERR, "Citadel client disconnected: ending session.\n"); CC->kill_me = KILLME_CLIENT_DISCONNECTED; - CtdlThreadName(old_name); return; } @@ -1069,8 +1060,6 @@ void do_command_loop(void) { time(&CC->lastidle); } - CtdlThreadName(cmdbuf); - if ((strncasecmp(cmdbuf, "ENT0", 4)) && (strncasecmp(cmdbuf, "MESG", 4)) && (strncasecmp(cmdbuf, "MSGS", 4))) @@ -1086,7 +1075,6 @@ void do_command_loop(void) { /* Run any after-each-command routines registered by modules */ PerformSessionHooks(EVT_CMD); - CtdlThreadName(old_name); } diff --git a/citadel/configure.ac b/citadel/configure.ac index e29bbea0c..c0f325b75 100644 --- a/citadel/configure.ac +++ b/citadel/configure.ac @@ -467,16 +467,6 @@ AC_ARG_WITH(gprof, ] ) -dnl disable thread table reporting -AC_ARG_WITH(threadlog, - [ --with-threadlog enable logging of thread table], - [ if test "x$withval" != "xno" ; then - AC_DEFINE(WITH_THREADLOG, [], [Define if you want logging of the thread tables.]) - fi - ] -) - - if test "$ac_cv_func_gethostbyname" = no; then AC_CHECK_LIB(nsl, gethostbyname) fi diff --git a/citadel/context.c b/citadel/context.c index f49d645dd..c6575d3db 100644 --- a/citadel/context.c +++ b/citadel/context.c @@ -2,7 +2,7 @@ * Citadel context management stuff. * Here's where we (hopefully) have all the code that manipulates contexts. * - * Copyright (c) 1987-2010 by the citadel.org team + * Copyright (c) 1987-2011 by the citadel.org team * * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -86,7 +86,7 @@ -citthread_key_t MyConKey; /* TSD key for MyContext() */ +pthread_key_t MyConKey; /* TSD key for MyContext() */ CitContext masterCC; @@ -259,12 +259,8 @@ int CtdlIsUserLoggedInByNum (long usernum) * This function is used *VERY* frequently and must be kept small. */ CitContext *MyContext(void) { - register CitContext *c; - - return ((c = (CitContext *) citthread_getspecific(MyConKey), - c == NULL) ? &masterCC : c - ); + return ((c = (CitContext *) pthread_getspecific(MyConKey), c == NULL) ? &masterCC : c); } @@ -310,16 +306,8 @@ void terminate_idle_sessions(void) /* * During shutdown, close the sockets of any sessions still connected. */ -void terminate_stuck_sessions(void) +void terminate_all_sessions(void) { - - abort(); - - /* FIXME this function has been disabled because it is somehow being - * called at times other than server shutdown, which is throwing all - * the users off. EPIC FAIL!!! - */ - CitContext *ccptr; int killed = 0; @@ -327,7 +315,7 @@ void terminate_stuck_sessions(void) for (ccptr = ContextList; ccptr != NULL; ccptr = ccptr->next) { if (ccptr->client_socket != -1) { - syslog(LOG_INFO, "terminate_stuck_sessions() is murdering %s", ccptr->curr_user); + syslog(LOG_INFO, "terminate_all_sessions() is murdering %s", ccptr->curr_user); close(ccptr->client_socket); ccptr->client_socket = -1; killed++; @@ -388,7 +376,6 @@ void RemoveContext (CitContext *con) - /* * Initialize a new context and place it in the list. The session number * used to be the PID (which is why it's called cs_pid), but that was when we @@ -405,7 +392,8 @@ CitContext *CreateNewContext(void) { return NULL; } memset(me, 0, sizeof(CitContext)); - /* Give the contaxt a name. Hopefully makes it easier to track */ + + /* Give the context a name. Hopefully makes it easier to track */ strcpy (me->user.fullname, "SYS_notauth"); /* The new context will be created already in the CON_EXECUTING state @@ -418,12 +406,16 @@ CitContext *CreateNewContext(void) { * the list. */ me->MigrateBuf = NewStrBuf(); + me->RecvBuf.Buf = NewStrBuf(); + + me->lastcmd = time(NULL); /* set lastcmd to now to prevent idle timer infanticide TODO: if we have a valid IO, use that to set the timer. */ + + begin_critical_section(S_SESSION_TABLE); me->cs_pid = ++next_pid; me->prev = NULL; me->next = ContextList; - me->lastcmd = time(NULL); /* set lastcmd to now to prevent idle timer infanticide */ ContextList = me; if (me->next != NULL) { me->next->prev = me; @@ -565,7 +557,7 @@ void CtdlClearSystemContext(void) CitContext *CCC = MyContext(); memset(CCC, 0, sizeof(CitContext)); - citthread_setspecific(MyConKey, NULL); + pthread_setspecific(MyConKey, NULL); } /* @@ -671,7 +663,7 @@ void dead_session_purge(int force) { * function initializes it. */ void InitializeMasterCC(void) { - memset(&masterCC, 0, sizeof( CitContext)); + memset(&masterCC, 0, sizeof(struct CitContext)); masterCC.internal_pgm = 1; masterCC.cs_pid = 0; } diff --git a/citadel/context.h b/citadel/context.h index 224b468f9..e0cad917b 100644 --- a/citadel/context.h +++ b/citadel/context.h @@ -145,7 +145,7 @@ typedef struct CitContext CitContext; #define CC MyContext() -extern citthread_key_t MyConKey; /* TSD key for MyContext() */ +extern pthread_key_t MyConKey; /* TSD key for MyContext() */ extern int num_sessions; extern CitContext masterCC; extern CitContext *ContextList; @@ -162,7 +162,7 @@ void set_async_waiting(struct CitContext *ccptr); CitContext *CloneContext(CitContext *CloneMe); /* forcibly close and flush fd's on shutdown */ -void terminate_stuck_sessions(void); +void terminate_all_sessions(void); /* Deprecated, user CtdlBumpNewMailCounter() instead */ void BumpNewMailCounter(long) __attribute__ ((deprecated)); @@ -182,7 +182,7 @@ static INLINE void become_session(CitContext *which_con) { /* pid_t tid = syscall(SYS_gettid); */ - citthread_setspecific(MyConKey, (void *)which_con ); + pthread_setspecific(MyConKey, (void *)which_con ); /* syslog(LOG_DEBUG, "[%d]: Now doing %s\n", (int) tid, diff --git a/citadel/database.c b/citadel/database.c index d0aab0730..97e6c4cf0 100644 --- a/citadel/database.c +++ b/citadel/database.c @@ -85,7 +85,7 @@ static DB_ENV *dbenv; /* The DB environment (global) */ void cdb_verbose_log(const DB_ENV *dbenv, const char *msg) { if (!IsEmptyStr(msg)) { - syslog(LOG_DEBUG, "DB: %s\n", msg); + syslog(LOG_DEBUG, "DB: %s", msg); } } @@ -93,7 +93,7 @@ void cdb_verbose_log(const DB_ENV *dbenv, const char *msg) /* Verbose logging callback */ void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg) { - syslog(LOG_ALERT, "DB: %s\n", msg); + syslog(LOG_ALERT, "DB: %s", msg); } @@ -105,7 +105,7 @@ static void txabort(DB_TXN * tid) ret = tid->abort(tid); if (ret) { - syslog(LOG_EMERG, "bdb(): txn_abort: %s\n", db_strerror(ret)); + syslog(LOG_EMERG, "bdb(): txn_abort: %s", db_strerror(ret)); abort(); } } @@ -118,7 +118,7 @@ static void txcommit(DB_TXN * tid) ret = tid->commit(tid, 0); if (ret) { - syslog(LOG_EMERG, "bdb(): txn_commit: %s\n", db_strerror(ret)); + syslog(LOG_EMERG, "bdb(): txn_commit: %s", db_strerror(ret)); abort(); } } @@ -131,14 +131,14 @@ static void txbegin(DB_TXN ** tid) ret = dbenv->txn_begin(dbenv, NULL, tid, 0); if (ret) { - syslog(LOG_EMERG, "bdb(): txn_begin: %s\n", db_strerror(ret)); + syslog(LOG_EMERG, "bdb(): txn_begin: %s", db_strerror(ret)); abort(); } } static void dbpanic(DB_ENV * env, int errval) { - syslog(LOG_EMERG, "bdb(): PANIC: %s\n", db_strerror(errval)); + syslog(LOG_EMERG, "bdb(): PANIC: %s", db_strerror(errval)); } static void cclose(DBC * cursor) @@ -146,7 +146,7 @@ static void cclose(DBC * cursor) int ret; if ((ret = cursor->c_close(cursor))) { - syslog(LOG_EMERG, "bdb(): c_close: %s\n", db_strerror(ret)); + syslog(LOG_EMERG, "bdb(): c_close: %s", db_strerror(ret)); abort(); } } @@ -157,29 +157,20 @@ static void bailIfCursor(DBC ** cursors, const char *msg) for (i = 0; i < MAXCDB; i++) if (cursors[i] != NULL) { - syslog(LOG_EMERG, - "bdb(): cursor still in progress on cdb %02x: %s\n", i, msg); + syslog(LOG_EMERG, "bdb(): cursor still in progress on cdb %02x: %s", i, msg); abort(); } } -void check_handles(void *arg) -{ - if (arg != NULL) { - ThreadTSD *tsd = (ThreadTSD *) arg; - - bailIfCursor(tsd->cursors, "in check_handles"); - - if (tsd->tid != NULL) { - syslog(LOG_EMERG, "bdb(): transaction still in progress!"); - abort(); - } - } -} void cdb_check_handles(void) { - check_handles(pthread_getspecific(ThreadKey)); + bailIfCursor(TSD->cursors, "in check_handles"); + + if (TSD->tid != NULL) { + syslog(LOG_EMERG, "bdb(): transaction still in progress!"); + abort(); + } } @@ -197,14 +188,14 @@ static void cdb_cull_logs(void) /* Get the list of names. */ if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) { - syslog(LOG_ERR, "cdb_cull_logs: %s\n", db_strerror(ret)); + syslog(LOG_ERR, "cdb_cull_logs: %s", db_strerror(ret)); return; } /* Print the list of names. */ if (list != NULL) { for (file = list; *file != NULL; ++file) { - syslog(LOG_DEBUG, "Deleting log: %s\n", *file); + syslog(LOG_DEBUG, "Deleting log: %s", *file); ret = unlink(*file); if (ret != 0) { snprintf(errmsg, sizeof(errmsg), @@ -228,7 +219,7 @@ static void cdb_cull_logs(void) void cmd_cull(char *argbuf) { if (CtdlAccessCheck(ac_internal)) return; cdb_cull_logs(); - cprintf("%d Database log file cull completed.\n", CIT_OK); + cprintf("%d Database log file cull completed.", CIT_OK); } @@ -239,11 +230,11 @@ void cdb_checkpoint(void) { int ret; - syslog(LOG_DEBUG, "-- db checkpoint --\n"); + syslog(LOG_DEBUG, "-- db checkpoint --"); ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0); if (ret != 0) { - syslog(LOG_EMERG, "cdb_checkpoint: txn_checkpoint: %s\n", db_strerror(ret)); + syslog(LOG_EMERG, "cdb_checkpoint: txn_checkpoint: %s", db_strerror(ret)); abort(); } @@ -270,22 +261,22 @@ void open_databases(void) int dbversion_major, dbversion_minor, dbversion_patch; int current_dbversion = 0; - syslog(LOG_DEBUG, "bdb(): open_databases() starting\n"); - syslog(LOG_DEBUG, "Compiled db: %s\n", DB_VERSION_STRING); - syslog(LOG_INFO, " Linked db: %s\n", + syslog(LOG_DEBUG, "bdb(): open_databases() starting"); + syslog(LOG_DEBUG, "Compiled db: %s", DB_VERSION_STRING); + syslog(LOG_INFO, " Linked db: %s", db_version(&dbversion_major, &dbversion_minor, &dbversion_patch)); current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch; - syslog(LOG_DEBUG, "Calculated dbversion: %d\n", current_dbversion); - syslog(LOG_DEBUG, " Previous dbversion: %d\n", CitControl.MMdbversion); + syslog(LOG_DEBUG, "Calculated dbversion: %d", current_dbversion); + syslog(LOG_DEBUG, " Previous dbversion: %d", CitControl.MMdbversion); if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL) && (CitControl.MMdbversion > current_dbversion) ) { - syslog(LOG_EMERG, "You are attempting to run the Citadel server using a version\n" - "of Berkeley DB that is older than that which last created or\n" - "updated the database. Because this would probably cause data\n" - "corruption or loss, the server is aborting execution now.\n"); + syslog(LOG_EMERG, "You are attempting to run the Citadel server using a version"); + syslog(LOG_EMERG, "of Berkeley DB that is older than that which last created or"); + syslog(LOG_EMERG, "updated the database. Because this would probably cause data"); + syslog(LOG_EMERG, "corruption or loss, the server is aborting execution now."); exit(CTDLEXIT_DB); } @@ -316,7 +307,7 @@ void open_databases(void) ctdl_data_dir, strerror(errno)); } syslog(LOG_DEBUG, "bdb(): Setting up DB environment\n"); - db_env_set_func_yield((int (*)(u_long, u_long))sched_yield); + /* db_env_set_func_yield((int (*)(u_long, u_long))sched_yield); */ ret = db_env_create(&dbenv, 0); if (ret) { syslog(LOG_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret)); @@ -453,8 +444,6 @@ void close_databases(void) int a; int ret; - ctdl_thread_internal_free_tsd(); - if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) { syslog(LOG_EMERG, "txn_checkpoint: %s\n", db_strerror(ret)); @@ -530,8 +519,8 @@ void cdb_decompress_if_necessary(struct cdbdata *cdb) cdb->len = (size_t) destLen; cdb->ptr = uncompressed_data; #else /* HAVE_ZLIB */ - syslog(LOG_EMERG, "Database contains compressed data, but this citserver was built without compression support.\n"); - abort(); + syslog(LOG_EMERG, "Database contains compressed data, but this citserver was built without compression support."); + exit(CTDLEXIT_DB); #endif /* HAVE_ZLIB */ } @@ -586,14 +575,14 @@ int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen) } #endif - if (MYTID != NULL) { + if (TSD->tid != NULL) { ret = dbp[cdb]->put(dbp[cdb], /* db */ - MYTID, /* transaction ID */ + TSD->tid, /* transaction ID */ &dkey, /* key */ &ddata, /* data */ 0); /* flags */ if (ret) { - syslog(LOG_EMERG, "cdb_store(%d): %s\n", cdb, db_strerror(ret)); + syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret)); abort(); } #ifdef HAVE_ZLIB @@ -603,7 +592,7 @@ int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen) return ret; } else { - bailIfCursor(MYCURSORS, "attempt to write during r/o cursor"); + bailIfCursor(TSD->cursors, "attempt to write during r/o cursor"); retry: txbegin(&tid); @@ -617,8 +606,7 @@ int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen) txabort(tid); goto retry; } else { - syslog(LOG_EMERG, "cdb_store(%d): %s\n", - cdb, db_strerror(ret)); + syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret)); abort(); } } else { @@ -648,8 +636,8 @@ int cdb_delete(int cdb, void *key, int keylen) dkey.size = keylen; dkey.data = key; - if (MYTID != NULL) { - ret = dbp[cdb]->del(dbp[cdb], MYTID, &dkey, 0); + if (TSD->tid != NULL) { + ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0); if (ret) { syslog(LOG_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret)); if (ret != DB_NOTFOUND) { @@ -657,7 +645,7 @@ int cdb_delete(int cdb, void *key, int keylen) } } } else { - bailIfCursor(MYCURSORS, "attempt to delete during r/o cursor"); + bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor"); retry: txbegin(&tid); @@ -684,12 +672,10 @@ static DBC *localcursor(int cdb) int ret; DBC *curs; - if (MYCURSORS[cdb] == NULL) - ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &curs, 0); + if (TSD->cursors[cdb] == NULL) + ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0); else - ret = - MYCURSORS[cdb]->c_dup(MYCURSORS[cdb], &curs, - DB_POSITION); + ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION); if (ret) { syslog(LOG_EMERG, "localcursor: %s\n", db_strerror(ret)); @@ -716,10 +702,10 @@ struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen) dkey.size = keylen; dkey.data = key; - if (MYTID != NULL) { + if (TSD->tid != NULL) { memset(&dret, 0, sizeof(DBT)); dret.flags = DB_DBT_MALLOC; - ret = dbp[cdb]->get(dbp[cdb], MYTID, &dkey, &dret, 0); + ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0); } else { DBC *curs; @@ -775,11 +761,11 @@ void cdb_free(struct cdbdata *cdb) void cdb_close_cursor(int cdb) { - if (MYCURSORS[cdb] != NULL) { - cclose(MYCURSORS[cdb]); + if (TSD->cursors[cdb] != NULL) { + cclose(TSD->cursors[cdb]); } - MYCURSORS[cdb] = NULL; + TSD->cursors[cdb] = NULL; } /* @@ -791,17 +777,17 @@ void cdb_rewind(int cdb) { int ret = 0; - if (MYCURSORS[cdb] != NULL) { + if (TSD->cursors[cdb] != NULL) { syslog(LOG_EMERG, "cdb_rewind: must close cursor on database %d before reopening.\n", cdb); abort(); - /* cclose(MYCURSORS[cdb]); */ + /* cclose(TSD->cursors[cdb]); */ } /* * Now initialize the cursor */ - ret = dbp[cdb]->cursor(dbp[cdb], MYTID, &MYCURSORS[cdb], 0); + ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0); if (ret) { syslog(LOG_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret)); abort(); @@ -824,15 +810,14 @@ struct cdbdata *cdb_next_item(int cdb) memset(&data, 0, sizeof(data)); data.flags = DB_DBT_MALLOC; - ret = MYCURSORS[cdb]->c_get(MYCURSORS[cdb], &key, &data, DB_NEXT); + ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT); if (ret) { if (ret != DB_NOTFOUND) { syslog(LOG_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret)); abort(); } - cclose(MYCURSORS[cdb]); - MYCURSORS[cdb] = NULL; + cdb_close_cursor(cdb); return NULL; /* presumably, end of file */ } @@ -853,14 +838,14 @@ struct cdbdata *cdb_next_item(int cdb) void cdb_begin_transaction(void) { - bailIfCursor(MYCURSORS, "can't begin transaction during r/o cursor"); + bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor"); - if (MYTID != NULL) { + if (TSD->tid != NULL) { syslog(LOG_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n"); abort(); } - txbegin(&MYTID); + txbegin(&TSD->tid); } void cdb_end_transaction(void) @@ -868,23 +853,23 @@ void cdb_end_transaction(void) int i; for (i = 0; i < MAXCDB; i++) - if (MYCURSORS[i] != NULL) { + if (TSD->cursors[i] != NULL) { syslog(LOG_WARNING, "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n", i); - cclose(MYCURSORS[i]); - MYCURSORS[i] = NULL; + cclose(TSD->cursors[i]); + TSD->cursors[i] = NULL; } - if (MYTID == NULL) { + if (TSD->tid == NULL) { syslog(LOG_EMERG, "cdb_end_transaction: ERROR: txcommit(NULL) !!\n"); abort(); } else { - txcommit(MYTID); + txcommit(TSD->tid); } - MYTID = NULL; + TSD->tid = NULL; } /* @@ -896,12 +881,11 @@ void cdb_trunc(int cdb) int ret; u_int32_t count; - if (MYTID != NULL) { - syslog(LOG_EMERG, - "cdb_trunc must not be called in a transaction.\n"); + if (TSD->tid != NULL) { + syslog(LOG_EMERG, "cdb_trunc must not be called in a transaction."); abort(); } else { - bailIfCursor(MYCURSORS, "attempt to write during r/o cursor"); + bailIfCursor(TSD->cursors, "attempt to write during r/o cursor"); retry: /* txbegin(&tid); */ @@ -916,9 +900,9 @@ void cdb_trunc(int cdb) } else { syslog(LOG_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret)); if (ret == ENOMEM) { - syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information.\n"); + syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php/faq:troubleshooting:out_of_lock_entries for more information."); } - abort(); + exit(CTDLEXIT_DB); } } else { /* txcommit(tid); */ diff --git a/citadel/event_client.c b/citadel/event_client.c index 956c9e92a..a265f30be 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -81,7 +81,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher * Server DB IO */ extern int evdb_count; -extern citthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; extern ev_async DBAddJob; @@ -100,11 +100,11 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) IO->db_abort_by_shutdown.data = IO; ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); - citthread_mutex_lock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventQueueMutex); syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); - citthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&DBEventQueueMutex); ev_async_send (event_db, &DBAddJob); syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); @@ -170,7 +170,7 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) * Client IO */ extern int evbase_count; -extern citthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; extern ev_async AddJob; @@ -190,11 +190,11 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) IO->abort_by_shutdown.data = IO; ev_cleanup_start(event_base, &IO->abort_by_shutdown); - citthread_mutex_lock(&EventQueueMutex); + pthread_mutex_lock(&EventQueueMutex); syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); ev_async_send (event_base, &AddJob); syslog(LOG_DEBUG, "EVENT Q Done.\n"); @@ -203,13 +203,13 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) int ShutDownEventQueue(void) { - citthread_mutex_lock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventQueueMutex); ev_async_send (event_db, &DBExitEventLoop); - citthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&DBEventQueueMutex); - citthread_mutex_lock(&EventQueueMutex); + pthread_mutex_lock(&EventQueueMutex); ev_async_send (EV_DEFAULT_ &ExitEventLoop); - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); return 0; } diff --git a/citadel/include/ctdl_module.h b/citadel/include/ctdl_module.h index 543f1ed4b..b452f5c05 100644 --- a/citadel/include/ctdl_module.h +++ b/citadel/include/ctdl_module.h @@ -149,37 +149,7 @@ int CtdlDoDirectoryServiceFunc(char *cn, char *ou, void **object, char *module, */ void CtdlModuleStartCryptoMsgs(char *ok_response, char *nosup_response, char *error_response); - -/* - * Citadel Threads API - */ -struct CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args); -void CtdlThreadSleep(int secs); -void CtdlThreadStop(struct CtdlThreadNode *thread); -int CtdlThreadCheckStop(void); -/* void CtdlThreadCancel2(struct CtdlThreadNode *thread); Leave this out, it should never be needed */ -const char *CtdlThreadName(const char *name); -struct CtdlThreadNode *CtdlThreadSelf(void); -int CtdlThreadGetCount(void); -int CtdlThreadGetWorkers(void); -double CtdlThreadGetWorkerAvg(void); -double CtdlThreadGetLoadAvg(void); -void CtdlThreadGC(void); -void CtdlThreadStopAll(void); -int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); -void CtdlThreadAllocTSD(void); - -#define CTDLTHREAD_BIGSTACK 0x0001 -#define CTDLTHREAD_WORKER 0x0002 - -/* Macros to speed up getting outr thread */ - -#define MYCURSORS (((ThreadTSD*)pthread_getspecific(ThreadKey))->cursors) -#define MYTID (((ThreadTSD*)pthread_getspecific(ThreadKey))->tid) -#define CT (((ThreadTSD*)pthread_getspecific(ThreadKey))->self) -#define CTP ((ThreadTSD*)pthread_getspecific(ThreadKey)) - -/** return the current context list as an array and do it in a safe manner +/* return the current context list as an array and do it in a safe manner * The returned data is a copy so only reading is useful * The number of contexts is returned in count. * Beware, this does not copy any of the data pointed to by the context. diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 40b5cee81..faf38b32f 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -383,7 +383,7 @@ static void evcurl_shutdown (void) */ int evbase_count = 0; int event_add_pipe[2] = {-1, -1}; -citthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; @@ -400,7 +400,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) const char *Key; /* get the control command... */ - citthread_mutex_lock(&EventQueueMutex); + pthread_mutex_lock(&EventQueueMutex); if (InboundEventQueues[0] == InboundEventQueue) { InboundEventQueue = InboundEventQueues[1]; @@ -410,7 +410,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) InboundEventQueue = InboundEventQueues[0]; q = InboundEventQueues[1]; } - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); It = GetNewHashPos(q, 0); while (GetNextHashPos(q, It, &len, &Key, &v)) @@ -437,7 +437,7 @@ void InitEventQueue(void) { struct rlimit LimitSet; - citthread_mutex_init(&EventQueueMutex, NULL); + pthread_mutex_init(&EventQueueMutex, NULL); if (pipe(event_add_pipe) != 0) { syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); @@ -486,7 +486,7 @@ void *client_event_thread(void *arg) InboundEventQueue = NULL; DeleteHash(&InboundEventQueues[0]); DeleteHash(&InboundEventQueues[1]); - citthread_mutex_destroy(&EventQueueMutex); +/* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); return(NULL); @@ -499,7 +499,7 @@ void *client_event_thread(void *arg) ev_loop *event_db; int evdb_count = 0; int evdb_add_pipe[2] = {-1, -1}; -citthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ HashList *DBQueueEvents = NULL; HashList *DBInboundEventQueue = NULL; HashList *DBInboundEventQueues[2] = { NULL, NULL }; @@ -518,7 +518,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) const char *Key; /* get the control command... */ - citthread_mutex_lock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventQueueMutex); if (DBInboundEventQueues[0] == DBInboundEventQueue) { DBInboundEventQueue = DBInboundEventQueues[1]; @@ -528,7 +528,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) DBInboundEventQueue = DBInboundEventQueues[0]; q = DBInboundEventQueues[1]; } - citthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&DBEventQueueMutex); It = GetNewHashPos(q, 0); while (GetNextHashPos(q, It, &len, &Key, &v)) @@ -562,7 +562,7 @@ void DBInitEventQueue(void) { struct rlimit LimitSet; - citthread_mutex_init(&DBEventQueueMutex, NULL); + pthread_mutex_init(&DBEventQueueMutex, NULL); if (pipe(evdb_add_pipe) != 0) { syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); @@ -608,7 +608,7 @@ void *db_event_thread(void *arg) DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); - citthread_mutex_destroy(&DBEventQueueMutex); +/* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ return(NULL); } @@ -622,8 +622,8 @@ CTDL_MODULE_INIT(event_client) { InitEventQueue(); DBInitEventQueue(); - CtdlThreadCreate("Client event", CTDLTHREAD_BIGSTACK, client_event_thread, NULL); - CtdlThreadCreate("DB event", CTDLTHREAD_BIGSTACK, db_event_thread, NULL); + CtdlThreadCreate(/*"Client event", */ client_event_thread); + CtdlThreadCreate(/*"DB event", */db_event_thread); /// todo register shutdown callback. } #endif diff --git a/citadel/modules/extnotify/extnotify_main.c b/citadel/modules/extnotify/extnotify_main.c index 1fda772b1..dafe4ae5c 100644 --- a/citadel/modules/extnotify/extnotify_main.c +++ b/citadel/modules/extnotify/extnotify_main.c @@ -385,7 +385,7 @@ void do_extnotify_queue(void) if (doing_queue) return; doing_queue = 1; - citthread_setspecific(MyConKey, (void *)&extnotify_queue_CC); + pthread_setspecific(MyConKey, (void *)&extnotify_queue_CC); /* * Go ahead and run the queue diff --git a/citadel/modules/pop3client/serv_pop3client.c b/citadel/modules/pop3client/serv_pop3client.c index 8bbb74bfc..83ddf64cc 100644 --- a/citadel/modules/pop3client/serv_pop3client.c +++ b/citadel/modules/pop3client/serv_pop3client.c @@ -55,7 +55,7 @@ struct CitContext pop3_client_CC; -citthread_mutex_t POP3QueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t POP3QueueMutex; /* locks the access to the following vars: */ HashList *POP3QueueRooms = NULL; /* rss_room_counter */ HashList *POP3FetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ @@ -159,12 +159,12 @@ eNextState FinalizePOP3AggrRun(AsyncIO *IO) syslog(LOG_DEBUG, "Terminating Aggregator; bye.\n"); It = GetNewHashPos(POP3FetchUrls, 0); - citthread_mutex_lock(&POP3QueueMutex); + pthread_mutex_lock(&POP3QueueMutex); { GetHashPosFromKey(POP3FetchUrls, SKEY(cptr->Url), It); DeleteEntryFromHash(POP3FetchUrls, It); } - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); DeleteHashPos(&It); return eAbort; } @@ -882,17 +882,17 @@ void pop3client_scan_room(struct ctdlroom *qrbuf, void *data) // pop3_room_counter *Count = NULL; // pop3aggr *cpptr; - citthread_mutex_lock(&POP3QueueMutex); + pthread_mutex_lock(&POP3QueueMutex); if (GetHash(POP3QueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { syslog(LOG_DEBUG, "pop3client: [%ld] %s already in progress.\n", qrbuf->QRnumber, qrbuf->QRname); - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); return; } - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); @@ -977,7 +977,7 @@ void pop3client_scan_room(struct ctdlroom *qrbuf, void *data) #if 0 /* todo: we need to reunite the url to be shure. */ - citthread_mutex_lock(&POP3ueueMutex); + pthread_mutex_lock(&POP3ueueMutex); GetHash(POP3FetchUrls, SKEY(ptr->Url), &vptr); use_this_cptr = (pop3aggr *)vptr; @@ -1004,15 +1004,15 @@ void pop3client_scan_room(struct ctdlroom *qrbuf, void *data) Put(use_this_cptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); use_this_cptr->roomlist_parts++; } - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); continue; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); #endif - citthread_mutex_lock(&POP3QueueMutex); + pthread_mutex_lock(&POP3QueueMutex); Put(POP3FetchUrls, SKEY(cptr->Url), cptr, DeletePOP3Aggregator); - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); } @@ -1063,7 +1063,7 @@ void pop3client_scan(void) { CtdlForEachRoom(pop3client_scan_room, NULL); - citthread_mutex_lock(&POP3QueueMutex); + pthread_mutex_lock(&POP3QueueMutex); it = GetNewHashPos(POP3FetchUrls, 0); while (GetNextHashPos(POP3FetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { @@ -1073,7 +1073,7 @@ void pop3client_scan(void) { DeletePOP3Aggregator(cptr);////TODO } DeleteHashPos(&it); - citthread_mutex_unlock(&POP3QueueMutex); + pthread_mutex_unlock(&POP3QueueMutex); syslog(LOG_DEBUG, "pop3client ended\n"); last_run = time(NULL); @@ -1083,7 +1083,7 @@ void pop3client_scan(void) { void pop3_cleanup(void) { - citthread_mutex_destroy(&POP3QueueMutex); + /* citthread_mutex_destroy(&POP3QueueMutex); TODO */ DeleteHash(&POP3FetchUrls); DeleteHash(&POP3QueueRooms); } @@ -1093,7 +1093,7 @@ CTDL_MODULE_INIT(pop3client) if (!threading) { CtdlFillSystemContext(&pop3_client_CC, "POP3aggr"); - citthread_mutex_init(&POP3QueueMutex, NULL); + pthread_mutex_init(&POP3QueueMutex, NULL); POP3QueueRooms = NewHash(1, lFlathash); POP3FetchUrls = NewHash(1, NULL); CtdlRegisterSessionHook(pop3client_scan, EVT_TIMER); diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index b3559f473..7b49ec5a6 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -62,7 +62,7 @@ #define TMP_SHORTER_URL_OFFSET 0xFE #define TMP_SHORTER_URLS 0xFD -citthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */ HashList *RSSQueueRooms = NULL; /* rss_room_counter */ HashList *RSSFetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ @@ -161,7 +161,7 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) { networker_save_message *Ctx = (networker_save_message *) IO->Data; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Ctx->Cfg->RefCount --; if (Ctx->Cfg->RefCount == 0) @@ -169,7 +169,7 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) UnlinkRSSAggregator(Ctx->Cfg); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); CtdlFreeMessage(Ctx->Msg); free_recipients(Ctx->recp); @@ -490,22 +490,22 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) const char *HK; void *vData; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); rncptr->RefCount --; if (rncptr->RefCount == 0) { UnlinkRSSAggregator(rncptr); } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); /* At = GetNewHashPos(RSSFetchUrls, 0); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At); GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); DeleteEntryFromHash(RSSFetchUrls, At); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); DeleteHashPos(&At); */ @@ -531,17 +531,17 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) const char *CfgPtr, *lPtr; const char *Err; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) { syslog(LOG_DEBUG, "rssclient: [%ld] %s already in progress.\n", qrbuf->QRnumber, qrbuf->QRname); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); return; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); @@ -600,7 +600,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) rncptr->Url = NewStrBuf(); StrBufExtract_NextToken(rncptr->Url, Line, &lPtr, '|'); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); GetHash(RSSFetchUrls, SKEY(rncptr->Url), &vptr); use_this_rncptr = (rss_aggregator *)vptr; if (use_this_rncptr != NULL) @@ -626,29 +626,29 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); use_this_rncptr->roomlist_parts++; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); continue; } - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); rncptr->ItemType = RSS_UNSET; rncptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); Put(RSSFetchUrls, SKEY(rncptr->Url), rncptr, DeleteRssCfg); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } } } if (Count != NULL) { Count->QRnumber = qrbuf->QRnumber; - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient: [%ld] %s now starting.\n", qrbuf->QRnumber, qrbuf->QRname); Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); } FreeStrBuf(&CfgData); FreeStrBuf(&CfgType); @@ -683,7 +683,7 @@ void rssclient_scan(void) { syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); - citthread_mutex_lock(&RSSQueueMutex); + pthread_mutex_lock(&RSSQueueMutex); it = GetNewHashPos(RSSFetchUrls, 0); while (GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && @@ -694,7 +694,7 @@ void rssclient_scan(void) { UnlinkRSSAggregator(rptr); } DeleteHashPos(&it); - citthread_mutex_unlock(&RSSQueueMutex); + pthread_mutex_unlock(&RSSQueueMutex); syslog(LOG_DEBUG, "rssclient ended\n"); doing_rssclient = 0; @@ -703,7 +703,7 @@ void rssclient_scan(void) { void rss_cleanup(void) { - citthread_mutex_destroy(&RSSQueueMutex); + /* citthread_mutex_destroy(&RSSQueueMutex); TODO */ DeleteHash(&RSSFetchUrls); DeleteHash(&RSSQueueRooms); } @@ -713,7 +713,7 @@ CTDL_MODULE_INIT(rssclient) { if (threading) { - citthread_mutex_init(&RSSQueueMutex, NULL); + pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version()); diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c index 538fe1bca..06fe0d262 100644 --- a/citadel/modules/smtp/serv_smtpqueue.c +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -89,7 +89,8 @@ #include "event_client.h" -citthread_mutex_t ActiveQItemsLock; +struct CitContext smtp_queue_CC; +pthread_mutex_t ActiveQItemsLock; HashList *ActiveQItems = NULL; HashList *QItemHandlers = NULL; @@ -106,21 +107,24 @@ void smtp_try_one_queue_entry(OneQueItem *MyQItem, void smtp_evq_cleanup(void) { - citthread_mutex_lock(&ActiveQItemsLock); + + pthread_mutex_lock(&ActiveQItemsLock); DeleteHash(&QItemHandlers); DeleteHash(&ActiveQItems); - citthread_mutex_unlock(&ActiveQItemsLock); - citthread_mutex_destroy(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); + pthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + CtdlClearSystemContext(); +/* citthread_mutex_destroy(&ActiveQItemsLock); TODO */ } int DecreaseQReference(OneQueItem *MyQItem) { int IDestructQueItem; - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); MyQItem->ActiveDeliveries--; IDestructQueItem = MyQItem->ActiveDeliveries == 0; - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); return IDestructQueItem; } @@ -129,12 +133,12 @@ void RemoveQItem(OneQueItem *MyQItem) HashPos *It; It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); { GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); DeleteHashPos(&It); } @@ -205,13 +209,13 @@ OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) Item->MessageID = -1; Item->QueMsgID = QueMsgID; - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); if (GetHash(ActiveQItems, IKEY(QueMsgID), &v)) { /* WHOOPS. somebody else is already working on this. */ - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); FreeQueItem(&Item); return NULL; } @@ -221,7 +225,7 @@ OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) IKEY(Item->QueMsgID), Item, HFreeQueItem); - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); } Token = NewStrBuf(); @@ -654,12 +658,12 @@ void smtp_do_procmsg(long msgnum, void *userdata) { syslog(LOG_DEBUG, "SMTP client: Retry time not yet reached.\n"); It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); { GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? DeleteHashPos(&It); return; @@ -671,12 +675,12 @@ void smtp_do_procmsg(long msgnum, void *userdata) { if (MyQItem->MessageID < 0L) { syslog(LOG_ERR, "SMTP Queue: no 'msgid' directive found!\n"); It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); { GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); DeleteHashPos(&It); ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? return; @@ -771,12 +775,12 @@ void smtp_do_procmsg(long msgnum, void *userdata) { else { It = GetNewHashPos(MyQItem->MailQEntries, 0); - citthread_mutex_lock(&ActiveQItemsLock); + pthread_mutex_lock(&ActiveQItemsLock); { GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } - citthread_mutex_unlock(&ActiveQItemsLock); + pthread_mutex_unlock(&ActiveQItemsLock); DeleteHashPos(&It); ////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this? @@ -796,32 +800,25 @@ void smtp_do_procmsg(long msgnum, void *userdata) { * * Run through the queue sending out messages. */ -void *smtp_queue_thread(void *arg) { +void smtp_do_queue(void) { + static int is_running = 0; int num_processed = 0; - struct CitContext smtp_queue_CC; - CtdlThreadSleep(10); + if (is_running) return; /* Concurrency check - only one can run */ + is_running = 1; - CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send"); - citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - syslog(LOG_DEBUG, "smtp_queue_thread() initializing\n"); - - while (!CtdlThreadCheckStop()) { - - syslog(LOG_INFO, "SMTP client: processing outbound queue\n"); + pthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + syslog(LOG_INFO, "SMTP client: processing outbound queue"); - if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) { - syslog(LOG_ERR, "Cannot find room <%s>\n", SMTP_SPOOLOUT_ROOM); - } - else { - num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL); - } - syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed\n", num_processed); - CtdlThreadSleep(60); + if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) { + syslog(LOG_ERR, "Cannot find room <%s>", SMTP_SPOOLOUT_ROOM); } - - CtdlClearSystemContext(); - return(NULL); + else { + num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL); + } + syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed", num_processed); + run_queue_now = 0; + is_running = 0; } @@ -892,15 +889,13 @@ void cmd_smtp(char *argbuf) { } - - CTDL_MODULE_INIT(smtp_queu) { -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT if (!threading) { + CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send"); ActiveQItems = NewHash(1, lFlathash); - citthread_mutex_init(&ActiveQItemsLock, NULL); + pthread_mutex_init(&ActiveQItemsLock, NULL); QItemHandlers = NewHash(0, NULL); @@ -915,11 +910,9 @@ CTDL_MODULE_INIT(smtp_queu) smtp_init_spoolout(); CtdlRegisterCleanupHook(smtp_evq_cleanup); - CtdlThreadCreate("SMTPEvent Send", CTDLTHREAD_BIGSTACK, smtp_queue_thread, NULL); CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands"); } -#endif /* return our Subversion id for the Log */ return "smtpeventclient"; diff --git a/citadel/serv_extensions.c b/citadel/serv_extensions.c index df80cc845..a94239573 100644 --- a/citadel/serv_extensions.c +++ b/citadel/serv_extensions.c @@ -4,7 +4,7 @@ * * Copyright (c) 1987-2011 by the citadel.org team * - * This program is free software; you can redistribute it and/or modify + * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. @@ -16,7 +16,7 @@ * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "sysdep.h" @@ -944,7 +944,7 @@ void PerformSessionHooks(int EventType) for (fcn = SessionHookTable; fcn != NULL; fcn = fcn->next) { if (fcn->eventtype == EventType) { if (EventType == EVT_TIMER) { - citthread_setspecific(MyConKey, NULL); /* for every hook */ + pthread_setspecific(MyConKey, NULL); /* for every hook */ } (*fcn->h_function_pointer) (); } diff --git a/citadel/server.h b/citadel/server.h index f07b6ffa4..bcca4fb2b 100644 --- a/citadel/server.h +++ b/citadel/server.h @@ -136,7 +136,6 @@ enum { S_CHKPWD, S_LOG, S_NETSPOOL, - S_THREAD_LIST, S_XMPP_QUEUE, S_SCHEDULE_LIST, S_SINGLE_USER, diff --git a/citadel/server_main.c b/citadel/server_main.c index cc4570466..58a397afe 100644 --- a/citadel/server_main.c +++ b/citadel/server_main.c @@ -116,13 +116,9 @@ int main(int argc, char **argv) // eCrashSymbolTable symbol_table; #endif - /* initialise semaphores here. Patch by Matt and davew - * its called here as they are needed by syslog for thread safety - */ - InitialiseSemaphores(); - /* initialize the master context */ InitializeMasterCC(); + InitializeMasterTSD(); /* parse command-line arguments */ for (a=1; aself) - Cc->self->signal = signum; - else - { - syslog(LOG_DEBUG, "Caught signal %d; shutting down.\n", signum); - exit_signal = signum; - } + syslog(LOG_DEBUG, "Caught signal %d; shutting down.", signum); + exit_signal = signum; +/// server_shutting_down = 1; } static RETSIGTYPE signal_exit(int signum) { @@ -145,22 +138,21 @@ void init_sysdep(void) { * CitContext structure (in the ContextList linked list) of the * session to which the calling thread is currently bound. */ - if (citthread_key_create(&MyConKey, NULL) != 0) { - syslog(LOG_CRIT, "Can't create TSD key: %s\n", - strerror(errno)); + if (pthread_key_create(&MyConKey, NULL) != 0) { + syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno)); } /* - * The action for unexpected signals and exceptions should be to - * call signal_cleanup() to gracefully shut down the server. + * Interript, hangup, and terminate signals should cause the server + * to gracefully clean up and shut down. */ sigemptyset(&set); - sigaddset(&set, SIGINT); // intr = shutdown + sigaddset(&set, SIGINT); sigaddset(&set, SIGHUP); sigaddset(&set, SIGTERM); sigprocmask(SIG_UNBLOCK, &set, NULL); - signal(SIGINT, signal_cleanup); // intr = shutdown + signal(SIGINT, signal_cleanup); signal(SIGHUP, signal_cleanup); signal(SIGTERM, signal_cleanup); signal(SIGUSR2, signal_exit); @@ -214,7 +206,7 @@ int ctdl_tcp_server(char *ip_addr, int port_number, int queue_len, char *errorme snprintf(errormessage, SIZ, "Error binding to [%s] : %s", ip_addr, strerror(errno) ); - syslog(LOG_ALERT, "%s\n", errormessage); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } } @@ -225,16 +217,14 @@ int ctdl_tcp_server(char *ip_addr, int port_number, int queue_len, char *errorme snprintf(errormessage, SIZ, "Error binding to [%s] : %s", ip_addr, strerror(errno) ); - syslog(LOG_ALERT, "%s\n", errormessage); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } } if (port_number == 0) { - snprintf(errormessage, SIZ, - "Can't start: no port number specified." - ); - syslog(LOG_ALERT, "%s\n", errormessage); + snprintf(errormessage, SIZ, "Can't start: no port number specified."); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } sin6.sin6_port = htons((u_short) port_number); @@ -247,7 +237,7 @@ int ctdl_tcp_server(char *ip_addr, int port_number, int queue_len, char *errorme snprintf(errormessage, SIZ, "Can't create a listening socket: %s", strerror(errno) ); - syslog(LOG_ALERT, "%s\n", errormessage); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } /* Set some socket options that make sense. */ @@ -265,7 +255,7 @@ int ctdl_tcp_server(char *ip_addr, int port_number, int queue_len, char *errorme snprintf(errormessage, SIZ, "Can't bind: %s", strerror(errno) ); - syslog(LOG_ALERT, "%s\n", errormessage); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } @@ -275,7 +265,7 @@ int ctdl_tcp_server(char *ip_addr, int port_number, int queue_len, char *errorme snprintf(errormessage, SIZ, "Can't listen: %s", strerror(errno) ); - syslog(LOG_ALERT, "%s\n", errormessage); + syslog(LOG_ALERT, "%s", errormessage); return (-1); } return (s); @@ -306,7 +296,7 @@ int ctdl_uds_server(char *sockpath, int queue_len, char *errormessage) snprintf(errormessage, SIZ, "citserver: can't unlink %s: %s", sockpath, strerror(errno) ); - syslog(LOG_EMERG, "%s\n", errormessage); + syslog(LOG_EMERG, "%s", errormessage); return(-1); } @@ -319,7 +309,7 @@ int ctdl_uds_server(char *sockpath, int queue_len, char *errormessage) snprintf(errormessage, SIZ, "citserver: Can't create a socket: %s", strerror(errno)); - syslog(LOG_EMERG, "%s\n", errormessage); + syslog(LOG_EMERG, "%s", errormessage); return(-1); } @@ -327,7 +317,7 @@ int ctdl_uds_server(char *sockpath, int queue_len, char *errormessage) snprintf(errormessage, SIZ, "citserver: Can't bind: %s", strerror(errno)); - syslog(LOG_EMERG, "%s\n", errormessage); + syslog(LOG_EMERG, "%s", errormessage); return(-1); } @@ -336,7 +326,7 @@ int ctdl_uds_server(char *sockpath, int queue_len, char *errormessage) snprintf(errormessage, SIZ, "citserver: Can't set socket to non-blocking: %s", strerror(errno)); - syslog(LOG_EMERG, "%s\n", errormessage); + syslog(LOG_EMERG, "%s", errormessage); close(s); return(-1); } @@ -345,7 +335,7 @@ int ctdl_uds_server(char *sockpath, int queue_len, char *errormessage) snprintf(errormessage, SIZ, "citserver: Can't listen: %s", strerror(errno)); - syslog(LOG_EMERG, "%s\n", errormessage); + syslog(LOG_EMERG, "%s", errormessage); return(-1); } @@ -419,13 +409,12 @@ static void flush_client_inbuf(void) */ void client_close(void) { CitContext *CCC = CC; - int r; if (!CCC) return; if (CCC->client_socket <= 0) return; - syslog(LOG_DEBUG, "Closing socket %d\n", CCC->client_socket); - r = close(CCC->client_socket); - if (!r) syslog(LOG_INFO, "close() failed: %s\n", strerror(errno)); + syslog(LOG_DEBUG, "Closing socket %d", CCC->client_socket); + + close(CCC->client_socket); CCC->client_socket = -1 ; } @@ -493,7 +482,9 @@ int client_write(const char *buf, int nbytes) if (select(1, NULL, &wset, NULL, NULL) == -1) { if (errno == EINTR) { - syslog(LOG_DEBUG, "client_write(%d bytes) select() interrupted.\n", nbytes-bytes_written); + syslog(LOG_DEBUG, "client_write(%d bytes) select() interrupted.", + nbytes-bytes_written + ); if (CtdlThreadCheckStop()) { CC->kill_me = KILLME_SELECT_INTERRUPTED; return (-1); @@ -503,7 +494,7 @@ int client_write(const char *buf, int nbytes) } } else { syslog(LOG_ERR, - "client_write(%d bytes) select failed: %s (%d)\n", + "client_write(%d bytes) select failed: %s (%d)", nbytes - bytes_written, strerror(errno), errno ); @@ -518,13 +509,12 @@ int client_write(const char *buf, int nbytes) retval = write(Ctx->client_socket, &buf[bytes_written], nbytes - bytes_written); if (retval < 1) { syslog(LOG_ERR, - "client_write(%d bytes) failed: %s (%d)\n", + "client_write(%d bytes) failed: %s (%d)", nbytes - bytes_written, strerror(errno), errno ); cit_backtrace(); client_close(); - // syslog(LOG_DEBUG, "Tried to send: %s", &buf[bytes_written]); Ctx->kill_me = KILLME_WRITE_FAILED; return -1; } @@ -593,9 +583,7 @@ int client_read_blob(StrBuf *Target, int bytes, int timeout) #endif retval = client_read_sslblob(Target, bytes, timeout); if (retval < 0) { - syslog(LOG_CRIT, - "%s failed\n", - __FUNCTION__); + syslog(LOG_CRIT, "%s failed", __FUNCTION__); } #ifdef BIGBAD_IODBG snprintf(fn, SIZ, "/tmp/foolog_%s.%d", CCC->ServiceName, CCC->cs_pid); @@ -1146,7 +1134,7 @@ int convert_login(char NameToConvert[]) { /* * This loop just keeps going and going and going... */ -void *worker_thread(void *arg) { +void *worker_thread(void *blah) { int highest; CitContext *ptr; CitContext *bind_me = NULL; @@ -1154,7 +1142,6 @@ void *worker_thread(void *arg) { int retval = 0; struct timeval tv; int force_purge = 0; - while (!CtdlThreadCheckStop()) { @@ -1208,18 +1195,18 @@ do_select: force_purge = 0; if (!CtdlThreadCheckStop()) { tv.tv_sec = 1; /* wake up every second if no input */ tv.tv_usec = 0; - retval = CtdlThreadSelect(highest + 1, &readfds, NULL, NULL, &tv); + retval = select(highest + 1, &readfds, NULL, NULL, &tv); } - else + else { return NULL; + } /* Now figure out who made this select() unblock. * First, check for an error or exit condition. */ if (retval < 0) { if (errno == EBADF) { - syslog(LOG_NOTICE, "select() failed: (%s)\n", - strerror(errno)); + syslog(LOG_NOTICE, "select() failed: (%s)\n", strerror(errno)); goto do_select; } if (errno != EINTR) { @@ -1228,13 +1215,13 @@ do_select: force_purge = 0; continue; } else { #if 0 - syslog(LOG_DEBUG, "Interrupted CtdlThreadSelect.\n"); + syslog(LOG_DEBUG, "Interrupted select()\n"); #endif if (CtdlThreadCheckStop()) return(NULL); goto do_select; } } - else if(retval == 0) { + else if (retval == 0) { if (CtdlThreadCheckStop()) return(NULL); } @@ -1318,21 +1305,21 @@ SKIP_SELECT: * In other words it handles new connections. * It is a thread. */ -void *select_on_master (void *arg) +void *select_on_master(void *blah) { struct ServiceFunctionHook *serviceptr; fd_set master_fds; int highest; struct timeval tv; int ssock; /* Descriptor for client socket */ - CitContext *con= NULL; /* Temporary context pointer */ + CitContext *con = NULL; /* Temporary context pointer */ int m; int i; int retval; struct CitContext select_on_master_CC; CtdlFillSystemContext(&select_on_master_CC, "select_on_master"); - citthread_setspecific(MyConKey, (void *)&select_on_master_CC); + pthread_setspecific(MyConKey, (void *)&select_on_master_CC); while (!CtdlThreadCheckStop()) { /* Initialize the fdset. */ @@ -1352,7 +1339,7 @@ void *select_on_master (void *arg) if (!CtdlThreadCheckStop()) { tv.tv_sec = 60; /* wake up every second if no input */ tv.tv_usec = 0; - retval = CtdlThreadSelect(highest + 1, &master_fds, NULL, NULL, &tv); + retval = select(highest + 1, &master_fds, NULL, NULL, &tv); } else return NULL; diff --git a/citadel/sysdep_decls.h b/citadel/sysdep_decls.h index c2f957804..a71efe59a 100644 --- a/citadel/sysdep_decls.h +++ b/citadel/sysdep_decls.h @@ -60,10 +60,9 @@ void start_daemon (int do_close_stdio); void checkcrash(void); void cmd_nset (char *cmdbuf); int convert_login (char *NameToConvert); -void *worker_thread (void *arg); void init_master_fdset(void); -void create_worker(void); -void *select_on_master (void *arg); +void *worker_thread(void *); +void *select_on_master(void *); extern volatile int exit_signal; extern volatile int shutdown_and_halt; @@ -76,45 +75,4 @@ extern int rescan[]; extern int SyslogFacility(char *name); - -/* - * Typdefs and stuff to abstract pthread for Citadel - */ -#ifdef HAVE_PTHREAD_H - -typedef pthread_t citthread_t; -typedef pthread_key_t citthread_key_t; -typedef pthread_mutex_t citthread_mutex_t; -typedef pthread_cond_t citthread_cond_t; -typedef pthread_attr_t citthread_attr_t; - - -#define citthread_mutex_init pthread_mutex_init -#define citthread_cond_init pthread_cond_init -#define citthread_attr_init pthread_attr_init -#define citthread_mutex_trylock pthread_mutex_trylock -#define citthread_mutex_lock pthread_mutex_lock -#define citthread_mutex_unlock pthread_mutex_unlock -#define citthread_key_create pthread_key_create -#define citthread_getspecific pthread_getspecific -#define citthread_setspecific pthread_setspecific -#define citthread_mutex_destroy pthread_mutex_destroy -#define citthread_cond_destroy pthread_cond_destroy -#define citthread_attr_destroy pthread_attr_destroy - -#define citthread_kill pthread_kill -#define citthread_cond_signal pthread_cond_signal -#define citthread_cancel pthread_cancel -#define citthread_cond_timedwait pthread_cond_timedwait -#define citthread_equal pthread_equal -#define citthread_self pthread_self -#define citthread_create pthread_create -#define citthread_attr_setstacksize pthread_attr_setstacksize -#define citthread_join pthread_join -#define citthread_cleanup_push pthread_cleanup_push -#define citthread_cleanup_pop pthread_cleanup_pop - - -#endif /* HAVE_PTHREAD_H */ - #endif /* SYSDEP_DECLS_H */ diff --git a/citadel/threads.c b/citadel/threads.c index 1f066a0cd..cb216febe 100644 --- a/citadel/threads.c +++ b/citadel/threads.c @@ -15,7 +15,7 @@ * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include @@ -75,28 +75,19 @@ */ static int num_threads = 0; /* Current number of threads */ -static int num_workers = 0; /* Current number of worker threads */ +pthread_key_t ThreadKey; +pthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */ +struct thread_tsd masterTSD; -CtdlThreadNode *CtdlThreadList = NULL; -CtdlThreadNode *CtdlThreadSchedList = NULL; -static CtdlThreadNode *GC_thread = NULL; -static char *CtdlThreadStates[CTDL_THREAD_LAST_STATE]; -double CtdlThreadLoadAvg = 0; -double CtdlThreadWorkerAvg = 0; -citthread_key_t ThreadKey; -citthread_mutex_t Critters[MAX_SEMAPHORES]; /* Things needing locking */ - - - -void InitialiseSemaphores(void) +void InitializeSemaphores(void) { int i; /* Set up a bunch of semaphores to be used for critical sections */ for (i=0; itid = NULL; - - memset(tsd->cursors, 0, sizeof tsd->cursors); - tsd->self = NULL; - - citthread_setspecific(ThreadKey, tsd); -} - - -void ctdl_thread_internal_free_tsd(void) -{ - ctdl_thread_internal_dest_tsd(citthread_getspecific(ThreadKey)); - citthread_setspecific(ThreadKey, NULL); -} - - -void ctdl_thread_internal_cleanup(void) -{ - int i; - CtdlThreadNode *this_thread, *that_thread; - - for (i=0; inext; - citthread_mutex_destroy(&that_thread->ThreadMutex); - citthread_cond_destroy(&that_thread->ThreadCond); - citthread_mutex_destroy(&that_thread->SleepMutex); - citthread_cond_destroy(&that_thread->SleepCond); - citthread_attr_destroy(&that_thread->attr); - free(that_thread); - } - ctdl_thread_internal_free_tsd(); -} - -void ctdl_thread_internal_init(void) -{ - CtdlThreadNode *this_thread; - int ret = 0; - - CtdlThreadStates[CTDL_THREAD_INVALID] = strdup ("Invalid Thread"); - CtdlThreadStates[CTDL_THREAD_VALID] = strdup("Valid Thread"); - CtdlThreadStates[CTDL_THREAD_CREATE] = strdup("Thread being Created"); - CtdlThreadStates[CTDL_THREAD_CANCELLED] = strdup("Thread Cancelled"); - CtdlThreadStates[CTDL_THREAD_EXITED] = strdup("Thread Exited"); - CtdlThreadStates[CTDL_THREAD_STOPPING] = strdup("Thread Stopping"); - CtdlThreadStates[CTDL_THREAD_STOP_REQ] = strdup("Thread Stop Requested"); - CtdlThreadStates[CTDL_THREAD_SLEEPING] = strdup("Thread Sleeping"); - CtdlThreadStates[CTDL_THREAD_RUNNING] = strdup("Thread Running"); - CtdlThreadStates[CTDL_THREAD_BLOCKED] = strdup("Thread Blocked"); - - /* Get ourself a thread entry */ - this_thread = malloc(sizeof(CtdlThreadNode)); - if (this_thread == NULL) { - syslog(LOG_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); - return; - } - // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. - memset (this_thread, 0, sizeof(CtdlThreadNode)); - - citthread_mutex_init (&(this_thread->ThreadMutex), NULL); - citthread_cond_init (&(this_thread->ThreadCond), NULL); - citthread_mutex_init (&(this_thread->SleepMutex), NULL); - citthread_cond_init (&(this_thread->SleepCond), NULL); - - /* We are garbage collector so create us as running */ - this_thread->state = CTDL_THREAD_RUNNING; - - if ((ret = citthread_attr_init(&this_thread->attr))) { - syslog(LOG_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret)); - free(this_thread); - return; - } - - this_thread->name = "Garbage Collection Thread"; - - this_thread->tid = citthread_self(); - GC_thread = this_thread; - CT = this_thread; - - num_threads++; // Increase the count of threads in the system. - - this_thread->next = CtdlThreadList; - CtdlThreadList = this_thread; - if (this_thread->next) - this_thread->next->prev = this_thread; - /* Set up start times */ - gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ - memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ -} - - -/* - * A function to chenge the state of a thread - */ -void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state) -{ - /* - * Wether we change state or not we need update the load values - */ - /* This mutex not needed here? */ - citthread_mutex_lock(&this_thread->ThreadMutex); /* To prevent race condition of a sleeping thread */ - if ((new_state == CTDL_THREAD_STOP_REQ) && (this_thread->state > CTDL_THREAD_STOP_REQ)) - this_thread->state = new_state; - if (((new_state == CTDL_THREAD_SLEEPING) || (new_state == CTDL_THREAD_BLOCKED)) && (this_thread->state == CTDL_THREAD_RUNNING)) - this_thread->state = new_state; - if ((new_state == CTDL_THREAD_RUNNING) && ((this_thread->state == CTDL_THREAD_SLEEPING) || (this_thread->state == CTDL_THREAD_BLOCKED))) - this_thread->state = new_state; - citthread_mutex_unlock(&this_thread->ThreadMutex); + pthread_mutex_unlock(&Critters[which_one]); } @@ -314,169 +144,9 @@ void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlTh */ void CtdlThreadStopAll(void) { - /* First run any registered shutdown hooks. This probably doesn't belong here. */ - PerformSessionHooks(EVT_SHUTDOWN); - - /* then close all tcp ports so nobody else can talk to us anymore. */ - CtdlShutdownServiceHooks(); - //FIXME: The signalling of the condition should not be in the critical_section - // We need to build a list of threads we are going to signal and then signal them afterwards - - ShutDownEventQueue(); - - CtdlThreadNode *this_thread; - - begin_critical_section(S_THREAD_LIST); - this_thread = CtdlThreadList; - // Ask the GC thread to stop first so everything knows we are shutting down. - GC_thread->state = CTDL_THREAD_STOP_REQ; - while(this_thread) - { - if (!citthread_equal(this_thread->tid, GC_thread->tid)) - citthread_kill(this_thread->tid, SIGHUP); - - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); - citthread_cond_signal(&this_thread->ThreadCond); - citthread_cond_signal(&this_thread->SleepCond); - this_thread->stop_ticker = time(NULL); - syslog(LOG_DEBUG, "Thread system stopping thread \"%s\" (0x%08lx).\n", - this_thread->name, this_thread->tid); - this_thread = this_thread->next; - } - end_critical_section(S_THREAD_LIST); -} - - -/* - * A function to wake up all sleeping threads - */ -void CtdlThreadWakeAll(void) -{ - CtdlThreadNode *this_thread; - - syslog(LOG_DEBUG, "Thread system waking all threads.\n"); - - begin_critical_section(S_THREAD_LIST); - this_thread = CtdlThreadList; - while(this_thread) - { - if (!this_thread->thread_func) - { - citthread_cond_signal(&this_thread->ThreadCond); - citthread_cond_signal(&this_thread->SleepCond); - } - this_thread = this_thread->next; - } - end_critical_section(S_THREAD_LIST); -} - - -/* - * A function to return the number of threads running in the system - */ -int CtdlThreadGetCount(void) -{ - return num_threads; -} - -int CtdlThreadGetWorkers(void) -{ - return num_workers; -} - -double CtdlThreadGetWorkerAvg(void) -{ - double ret; - - begin_critical_section(S_THREAD_LIST); - ret = CtdlThreadWorkerAvg; - end_critical_section(S_THREAD_LIST); - return ret; -} - -double CtdlThreadGetLoadAvg(void) -{ - double load_avg[3] = {0.0, 0.0, 0.0}; - - int ret = 0; - int smp_num_cpus; - - /* Borrowed this straight from procps */ - smp_num_cpus = sysconf(_SC_NPROCESSORS_ONLN); - if(smp_num_cpus<1) smp_num_cpus=1; /* SPARC glibc is buggy */ - -#ifdef HAVE_GETLOADAVG - ret = getloadavg(load_avg, 3); -#endif - if (ret < 0) - return 0; - return load_avg[0] / smp_num_cpus; -/* - * This old chunk of code return a value that indicated the load on citserver - * This value could easily reach 100 % even when citserver was doing very little and - * hence the machine has much more spare capacity. - * Because this value was used to determine if the machine was under heavy load conditions - * from other processes in the system then citserver could be strangled un-necesarily - * What we are actually trying to achieve is to strangle citserver if the machine is heavily loaded. - * So we have changed this. - - begin_critical_section(S_THREAD_LIST); - ret = CtdlThreadLoadAvg; - end_critical_section(S_THREAD_LIST); - return ret; -*/ -} - - - - -/* - * A function to rename a thread - * Returns a const char * - */ -const char *CtdlThreadName(const char *name) -{ - const char *old_name; - - if (!CT) - { - syslog(LOG_WARNING, "Thread system WARNING. Attempt to CtdlThreadRename() a non thread. %s\n", name); - return NULL; - } - old_name = CT->name; - if (name) - CT->name = name; - return (old_name); -} - - -/* - * A function to force a thread to exit - */ -void CtdlThreadCancel(CtdlThreadNode *thread) -{ - CtdlThreadNode *this_thread; - - if (!thread) - this_thread = CT; - else - this_thread = thread; - if (!this_thread) - { - syslog(LOG_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() a non thread.\n"); - CtdlThreadStopAll(); - return; - } - - if (!this_thread->thread_func) - { - syslog(LOG_EMERG, "Thread system PANIC. Attempt to CtdlThreadCancel() the garbage collector.\n"); - CtdlThreadStopAll(); - return; - } - - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_CANCELLED); - citthread_cancel(this_thread->tid); + terminate_all_sessions(); /* close all client sockets */ + CtdlShutdownServiceHooks(); /* close all listener sockets to prevent new connections */ + PerformSessionHooks(EVT_SHUTDOWN); /* run any registered shutdown hooks */ } @@ -485,766 +155,99 @@ void CtdlThreadCancel(CtdlThreadNode *thread) */ int CtdlThreadCheckStop(void) { - int state; - - if (!CT) - { - syslog(LOG_EMERG, "Thread system PANIC, CtdlThreadCheckStop() called by a non thread.\n"); - CtdlThreadStopAll(); - return -1; - } - - state = CT->state; - - if (CT->signal) - { - syslog(LOG_DEBUG, "Thread \"%s\" caught signal %d.\n", CT->name, CT->signal); - if (CT->signal == SIGHUP) - CT->state = CTDL_THREAD_STOP_REQ; - CT->signal = 0; - } - if(state == CTDL_THREAD_STOP_REQ) - { - CT->state = CTDL_THREAD_STOPPING; - return -1; - } - else if((state < CTDL_THREAD_STOP_REQ) && (state > CTDL_THREAD_CREATE)) - { - return -1; - } - return 0; -} - - -/* - * A function to ask a thread to exit - * The thread must call CtdlThreadCheckStop() periodically to determine if it should exit - */ -void CtdlThreadStop(CtdlThreadNode *thread) -{ - CtdlThreadNode *this_thread; - - if (!thread) - this_thread = CT; - else - this_thread = thread; - if (!this_thread) - return; - if (!(this_thread->thread_func)) - return; // Don't stop garbage collector - if (!citthread_equal(this_thread->tid, GC_thread->tid)) - citthread_kill(this_thread->tid, SIGHUP); + /* FIXME this needs to do something useful. moar code pls ! */ - ctdl_thread_internal_change_state (this_thread, CTDL_THREAD_STOP_REQ); - citthread_cond_signal(&this_thread->ThreadCond); - citthread_cond_signal(&this_thread->SleepCond); - this_thread->stop_ticker = time(NULL); -} - -/* - * So we now have a sleep command that works with threads but it is in seconds - */ -void CtdlThreadSleep(int secs) -{ - struct timespec wake_time; - struct timeval time_now; - - - if (!CT) - { - syslog(LOG_WARNING, "CtdlThreadSleep() called by something that is not a thread. Should we die?\n"); - return; - } - - memset (&wake_time, 0, sizeof(struct timespec)); - gettimeofday(&time_now, NULL); - wake_time.tv_sec = time_now.tv_sec + secs; - wake_time.tv_nsec = time_now.tv_usec * 10; - - ctdl_thread_internal_change_state (CT, CTDL_THREAD_SLEEPING); - - citthread_mutex_lock(&CT->ThreadMutex); /* Prevent something asking us to awaken before we've gone to sleep */ - citthread_cond_timedwait(&CT->SleepCond, &CT->ThreadMutex, &wake_time); - citthread_mutex_unlock(&CT->ThreadMutex); - - ctdl_thread_internal_change_state (CT, CTDL_THREAD_RUNNING); -} - - -/* - * Routine to clean up our thread function on exit - */ -static void ctdl_internal_thread_cleanup(void *arg) -{ - /* - * In here we were called by the current thread because it is exiting - * NB. WE ARE THE CURRENT THREAD - */ - if (CT) - { - const char *name = CT->name; - const pid_t tid = CT->tid; - - syslog(LOG_NOTICE, "Thread \"%s\" (0x%08lx) exited.\n", name, (unsigned long) tid); - } - else - { - syslog(LOG_NOTICE, "some ((unknown ? ? ?) Thread exited.\n"); - } - - #ifdef HAVE_BACKTRACE -/// eCrash_UnregisterThread(); - #endif - - citthread_mutex_lock(&CT->ThreadMutex); - CT->state = CTDL_THREAD_EXITED; // needs to be last thing else house keeping will unlink us too early - citthread_mutex_unlock(&CT->ThreadMutex); -} - -/* - * Garbage collection routine. - * Gets called by main() in a loop to clean up the thread list periodically. - */ -void CtdlThreadGC (void) -{ - - - return; - /* FIXME this is a big deal, but I think it's causing corruption */ - - - CtdlThreadNode *this_thread, *that_thread; - int workers = 0, sys_workers; - int ret=0; - - begin_critical_section(S_THREAD_LIST); - - /* Handle exiting of garbage collector thread */ - if(num_threads == 1) - CtdlThreadList->state = CTDL_THREAD_EXITED; - -#ifdef WITH_THREADLOG - syslog(LOG_DEBUG, "Thread system running garbage collection.\n"); -#endif - /* - * Woke up to do garbage collection - */ - this_thread = CtdlThreadList; - while(this_thread) - { - that_thread = this_thread; - this_thread = this_thread->next; - - if ((that_thread->state == CTDL_THREAD_STOP_REQ || that_thread->state == CTDL_THREAD_STOPPING) - && (!citthread_equal(that_thread->tid, citthread_self()))) - { - syslog(LOG_DEBUG, "Waiting for thread %s (0x%08lx) to exit.\n", that_thread->name, that_thread->tid); - terminate_stuck_sessions(); - } - else - { - /** - * Catch the situation where a worker was asked to stop but couldn't and we are not - * shutting down. - */ - that_thread->stop_ticker = 0; - } - - if (that_thread->stop_ticker + 5 == time(NULL)) - { - syslog(LOG_DEBUG, "Thread System: The thread \"%s\" (0x%08lx) failed to self terminate within 5 ticks. It would be cancelled now.\n", that_thread->name, that_thread->tid); - if ((that_thread->flags & CTDLTHREAD_WORKER) == 0) - syslog(LOG_INFO, "Thread System: A non worker thread would have been canceled this may cause message loss.\n"); -// that_thread->state = CTDL_THREAD_CANCELLED; - that_thread->stop_ticker++; -// citthread_cancel(that_thread->tid); -// continue; - } - - /* Do we need to clean up this thread? */ - if ((that_thread->state != CTDL_THREAD_EXITED) && (that_thread->state != CTDL_THREAD_CANCELLED)) - { - if(that_thread->flags & CTDLTHREAD_WORKER) - workers++; /* Sanity check on number of worker threads */ - continue; - } - - if (citthread_equal(that_thread->tid, citthread_self()) && that_thread->thread_func) - { /* Sanity check */ - end_critical_section(S_THREAD_LIST); - syslog(LOG_EMERG, "Thread system PANIC, a thread is trying to clean up after itself.\n"); - abort(); - return; - } - - if (num_threads <= 0) - { /* Sanity check */ - end_critical_section(S_THREAD_LIST); - syslog(LOG_EMERG, "Thread system PANIC, num_threads <= 0 and trying to do Garbage Collection.\n"); - abort(); - return; - } - - if(that_thread->flags & CTDLTHREAD_WORKER) - num_workers--; /* This is a wroker thread so reduce the count. */ - num_threads--; - /* If we are unlinking the list head then the next becomes the list head */ - if(that_thread->prev) - that_thread->prev->next = that_thread->next; - else - CtdlThreadList = that_thread->next; - if(that_thread->next) - that_thread->next->prev = that_thread->prev; - - citthread_cond_signal(&that_thread->ThreadCond); - citthread_cond_signal(&that_thread->SleepCond); // Make sure this thread is awake - citthread_mutex_lock(&that_thread->ThreadMutex); // Make sure it has done what its doing - citthread_mutex_unlock(&that_thread->ThreadMutex); - /* - * Join on the thread to do clean up and prevent memory leaks - * Also makes sure the thread has cleaned up after itself before we remove it from the list - * We can join on the garbage collector thread the join should just return EDEADLCK - */ - ret = citthread_join (that_thread->tid, NULL); - if (ret == EDEADLK) - syslog(LOG_DEBUG, "Garbage collection on own thread.\n"); - else if (ret == EINVAL) - syslog(LOG_DEBUG, "Garbage collection, that thread already joined on.\n"); - else if (ret == ESRCH) - syslog(LOG_DEBUG, "Garbage collection, no thread to join on.\n"); - else if (ret != 0) - syslog(LOG_DEBUG, "Garbage collection, citthread_join returned an unknown error(%d).\n", ret); - /* - * Now we own that thread entry - */ - syslog(LOG_INFO, "Garbage Collection for thread \"%s\" (0x%08lx).\n", - that_thread->name, that_thread->tid); - citthread_mutex_destroy(&that_thread->ThreadMutex); - citthread_cond_destroy(&that_thread->ThreadCond); - citthread_mutex_destroy(&that_thread->SleepMutex); - citthread_cond_destroy(&that_thread->SleepCond); - citthread_attr_destroy(&that_thread->attr); - free(that_thread); - } - sys_workers = num_workers; - end_critical_section(S_THREAD_LIST); - - /* Sanity check number of worker threads */ - if (workers != sys_workers) - { - syslog(LOG_EMERG, - "Thread system PANIC, discrepancy in number of worker threads. Counted %d, should be %d.\n", - workers, sys_workers - ); - abort(); - } + return 0; } - - /* - * Runtime function for a Citadel Thread. - * This initialises the threads environment and then calls the user supplied thread function - * Note that this is the REAL thread function and wraps the users thread function. + * Return a pointer to our thread-specific (not session-specific) data. */ -static void *ctdl_internal_thread_func (void *arg) -{ - CtdlThreadNode *this_thread; - void *ret = NULL; - - /* lock and unlock the thread list. - * This causes this thread to wait until all its creation stuff has finished before it - * can continue its execution. - */ - begin_critical_section(S_THREAD_LIST); - this_thread = (CtdlThreadNode *) arg; - gettimeofday(&this_thread->start_time, NULL); /* Time this thread started */ - - // Register the cleanup function to take care of when we exit. - citthread_cleanup_push(ctdl_internal_thread_cleanup, NULL); - // Get our thread data structure - CtdlThreadAllocTSD(); - CT = this_thread; - this_thread->pid = getpid(); - memcpy(&this_thread->last_state_change, &this_thread->start_time, sizeof (struct timeval)); /* Changed state so mark it. */ - /* Only change to running state if we weren't asked to stop during the create cycle - * Other wise there is a window to allow this threads creation to continue to full grown and - * therby prevent a shutdown of the server. - */ - if (!CtdlThreadCheckStop()) - { - citthread_mutex_lock(&this_thread->ThreadMutex); - this_thread->state = CTDL_THREAD_RUNNING; - citthread_mutex_unlock(&this_thread->ThreadMutex); - } - end_critical_section(S_THREAD_LIST); - - // Register for tracing - #ifdef HAVE_BACKTRACE -/// eCrash_RegisterThread(this_thread->name, 0); - #endif - - // Tell the world we are here -#if defined(HAVE_SYSCALL_H) && defined (SYS_gettid) - this_thread->reltid = syscall(SYS_gettid); -#endif - syslog(LOG_NOTICE, "Created a new thread \"%s\" (0x%08lx).\n", - this_thread->name, this_thread->tid); - - /* - * run the thread to do the work but only if we haven't been asked to stop - */ - if (!CtdlThreadCheckStop()) - ret = (this_thread->thread_func)(this_thread->user_args); - - /* - * Our thread is exiting either because it wanted to end or because the server is stopping - * We need to clean up - */ - citthread_cleanup_pop(1); // Execute our cleanup routine and remove it - - return(ret); +struct thread_tsd *MyThread(void) { + register struct thread_tsd *c; + return ((c = (struct thread_tsd *) pthread_getspecific(ThreadKey), c == NULL) ? &masterTSD : c); } - -/* - * Function to initialise an empty thread structure +/* + * Called by CtdlThreadCreate() + * We have to pass through here before starting our thread in order to create a set of data + * that is thread-specific rather than session-specific. */ -CtdlThreadNode *ctdl_internal_init_thread_struct(CtdlThreadNode *this_thread, long flags) +void *CTC_backend(void *supplied_start_routine) { - int ret = 0; - - // Ensuring this is zero'd means we make sure the thread doesn't start doing its thing until we are ready. - memset (this_thread, 0, sizeof(CtdlThreadNode)); - - /* Create the mutex's early so we can use them */ - citthread_mutex_init (&(this_thread->ThreadMutex), NULL); - citthread_cond_init (&(this_thread->ThreadCond), NULL); - citthread_mutex_init (&(this_thread->SleepMutex), NULL); - citthread_cond_init (&(this_thread->SleepCond), NULL); - - this_thread->state = CTDL_THREAD_CREATE; - - if ((ret = citthread_attr_init(&this_thread->attr))) { - citthread_mutex_unlock(&this_thread->ThreadMutex); - citthread_mutex_destroy(&(this_thread->ThreadMutex)); - citthread_cond_destroy(&(this_thread->ThreadCond)); - citthread_mutex_destroy(&(this_thread->SleepMutex)); - citthread_cond_destroy(&(this_thread->SleepCond)); - syslog(LOG_EMERG, "Thread system, citthread_attr_init: %s\n", strerror(ret)); - free(this_thread); - return NULL; - } - - /* Our per-thread stacks need to be bigger than the default size, - * otherwise the MIME parser crashes on FreeBSD, and the IMAP service - * crashes on 64-bit Linux. - */ - if (flags & CTDLTHREAD_BIGSTACK) - { -#ifdef WITH_THREADLOG - syslog(LOG_INFO, "Thread system. Creating BIG STACK thread.\n"); -#endif - if ((ret = citthread_attr_setstacksize(&this_thread->attr, THREADSTACKSIZE))) { - citthread_mutex_unlock(&this_thread->ThreadMutex); - citthread_mutex_destroy(&(this_thread->ThreadMutex)); - citthread_cond_destroy(&(this_thread->ThreadCond)); - citthread_mutex_destroy(&(this_thread->SleepMutex)); - citthread_cond_destroy(&(this_thread->SleepCond)); - citthread_attr_destroy(&this_thread->attr); - syslog(LOG_EMERG, "Thread system, citthread_attr_setstacksize: %s\n", - strerror(ret)); - free(this_thread); - return NULL; - } - } + struct thread_tsd *mytsd; + void *(*start_routine)(void*) = supplied_start_routine; - /* Set this new thread with an avg_blocked of 2. We do this so that its creation affects the - * load average for the system. If we don't do this then we create a mass of threads at the same time - * because the creation didn't affect the load average. - */ - this_thread->avg_blocked = 2; - - return (this_thread); -} + mytsd = (struct thread_tsd *) malloc(sizeof(struct thread_tsd)); + memset(mytsd, 0, sizeof(struct thread_tsd)); + pthread_setspecific(ThreadKey, (const void *) mytsd); + start_routine(NULL); + return(NULL); +} /* - * Internal function to create a thread. + * Function to create a thread. */ -CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args) +void CtdlThreadCreate(void *(*start_routine)(void*)) { + pthread_t thread; + pthread_attr_t attr; int ret = 0; - CtdlThreadNode *this_thread; - - if (num_threads >= 32767) - { - syslog(LOG_EMERG, "Thread system. Thread list full.\n"); - return NULL; - } - - this_thread = malloc(sizeof(CtdlThreadNode)); - if (this_thread == NULL) { - syslog(LOG_EMERG, "Thread system, can't allocate CtdlThreadNode, exiting\n"); - return NULL; - } - - /* Initialise the thread structure */ - if (ctdl_internal_init_thread_struct(this_thread, flags) == NULL) - { - free(this_thread); - syslog(LOG_EMERG, "Thread system, can't initialise CtdlThreadNode, exiting\n"); - return NULL; - } - /* - * If we got here we are going to create the thread so we must initilise the structure - * first because most implimentations of threading can't create it in a stopped state - * and it might want to do things with its structure that aren't initialised otherwise. - */ - if(name) - { - this_thread->name = name; - } - else - { - this_thread->name = "Un-named Thread"; - } - - this_thread->flags = flags; - this_thread->thread_func = thread_func; - this_thread->user_args = args; - - begin_critical_section(S_THREAD_LIST); - /* - * We pass this_thread into the thread as its args so that it can find out information - * about itself and it has a bit of storage space for itself, not to mention that the REAL - * thread function needs to finish off the setup of the structure - */ - if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0)) - { - end_critical_section(S_THREAD_LIST); - syslog(LOG_ALERT, "Thread system, Can't create thread: %s\n", - strerror(ret)); - citthread_mutex_unlock(&this_thread->ThreadMutex); - citthread_mutex_destroy(&(this_thread->ThreadMutex)); - citthread_cond_destroy(&(this_thread->ThreadCond)); - citthread_mutex_destroy(&(this_thread->SleepMutex)); - citthread_cond_destroy(&(this_thread->SleepCond)); - citthread_attr_destroy(&this_thread->attr); - free(this_thread); - return NULL; - } - num_threads++; // Increase the count of threads in the system. - if(this_thread->flags & CTDLTHREAD_WORKER) - num_workers++; - - this_thread->next = CtdlThreadList; - CtdlThreadList = this_thread; - if (this_thread->next) - this_thread->next->prev = this_thread; - - end_critical_section(S_THREAD_LIST); - - return this_thread; -} - -/* - * Wrapper function to create a thread - * ensures the critical section and other protections are in place. - * char *name = name to give to thread, if NULL, use generic name - * int flags = flags to determine type of thread and standard facilities - */ -CtdlThreadNode *CtdlThreadCreate(char *name, long flags, void *(*thread_func) (void *arg), void *args) -{ - CtdlThreadNode *ret = NULL; - - ret = ctdl_internal_create_thread(name, flags, thread_func, args); - return ret; -} + ret = pthread_attr_init(&attr); + ret = pthread_attr_setstacksize(&attr, THREADSTACKSIZE); + ret = pthread_create(&thread, &attr, CTC_backend, (void *)start_routine); + if (ret != 0) syslog(LOG_EMERG, "pthread_create() : %s", strerror(errno)); -CtdlThreadNode *ctdl_thread_internal_start_scheduled (CtdlThreadNode *this_thread) -{ - int ret = 0; - - begin_critical_section(S_THREAD_LIST); - /* - * We pass this_thread into the thread as its args so that it can find out information - * about itself and it has a bit of storage space for itself, not to mention that the REAL - * thread function needs to finish off the setup of the structure - */ - if ((ret = citthread_create(&this_thread->tid, &this_thread->attr, ctdl_internal_thread_func, this_thread) != 0)) - { - end_critical_section(S_THREAD_LIST); - syslog(LOG_DEBUG, "Failed to start scheduled thread \"%s\": %s\n", this_thread->name, strerror(ret)); - citthread_mutex_destroy(&(this_thread->ThreadMutex)); - citthread_cond_destroy(&(this_thread->ThreadCond)); - citthread_mutex_destroy(&(this_thread->SleepMutex)); - citthread_cond_destroy(&(this_thread->SleepCond)); - citthread_attr_destroy(&this_thread->attr); - free(this_thread); - return NULL; - } - - - num_threads++; // Increase the count of threads in the system. - if(this_thread->flags & CTDLTHREAD_WORKER) - num_workers++; - - this_thread->next = CtdlThreadList; - CtdlThreadList = this_thread; - if (this_thread->next) - this_thread->next->prev = this_thread; - - end_critical_section(S_THREAD_LIST); - - - return this_thread; + ++num_threads; } - -void ctdl_thread_internal_check_scheduled(void) -{ - CtdlThreadNode *this_thread, *that_thread; - time_t now; - - /* Don't start scheduled threads if the system wants single user mode */ - if (CtdlWantSingleUser()) - return; - - if (try_critical_section(S_SCHEDULE_LIST)) - return; /* If this list is locked we wait till the next chance */ - - now = time(NULL); - -#ifdef WITH_THREADLOG - syslog(LOG_DEBUG, "Checking for scheduled threads to start.\n"); -#endif - - this_thread = CtdlThreadSchedList; - while(this_thread) - { - that_thread = this_thread; - this_thread = this_thread->next; - - if (now > that_thread->when) - { - /* Unlink from schedule list */ - if (that_thread->prev) - that_thread->prev->next = that_thread->next; - else - CtdlThreadSchedList = that_thread->next; - if (that_thread->next) - that_thread->next->prev = that_thread->prev; - - that_thread->next = that_thread->prev = NULL; -#ifdef WITH_THREADLOG - syslog(LOG_DEBUG, "About to start scheduled thread \"%s\".\n", that_thread->name); -#endif - if (CT->state > CTDL_THREAD_STOP_REQ) - { /* Only start it if the system is not stopping */ - if (ctdl_thread_internal_start_scheduled (that_thread)) - { -#ifdef WITH_THREADLOG - syslog(LOG_INFO, "Thread system, Started a scheduled thread \"%s\" (0x%08lx).\n", - that_thread->name, that_thread->tid); -#endif - } - } - } -#ifdef WITH_THREADLOG - else - { - syslog(LOG_DEBUG, "Thread \"%s\" will start in %ld seconds.\n", - that_thread->name, that_thread->when - time(NULL)); - } -#endif - } - end_critical_section(S_SCHEDULE_LIST); +void InitializeMasterTSD(void) { + memset(&masterTSD, 0, sizeof(struct thread_tsd)); } /* - * A warapper function for select so we can show a thread as blocked + * Initialize the thread system */ -int CtdlThreadSelect(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) -{ - int ret = 0; - - ctdl_thread_internal_change_state(CT, CTDL_THREAD_BLOCKED); - if (!CtdlThreadCheckStop()) - ret = select(n, readfds, writefds, exceptfds, timeout); - /** - * If the select returned <= 0 then it failed due to an error - * or timeout so this thread could stop if asked to do so. - * Anything else means it needs to continue unless the system is shutting down - */ - if (ret > 0) - { - /** - * The select says this thread needs to do something useful. - * This thread was in an idle state so it may have been asked to stop - * but if the system isn't shutting down this thread is no longer - * idle and select has given it a task to do so it must not stop - * In this condition we need to force it into the running state. - * CtdlThreadGC will clear its ticker for us. - * - * FIXME: there is still a small hole here. It is possible for the sequence of locking - * to allow the state to get changed to STOP_REQ just after this code if the other thread - * has decided to change the state before this lock, it there fore has to wait till the lock - * completes but it will continue to change the state. We need something a bit better here. - */ - citthread_mutex_lock(&CT->ThreadMutex); /* To prevent race condition of a sleeping thread */ - if (GC_thread->state > CTDL_THREAD_STOP_REQ && CT->state <= CTDL_THREAD_STOP_REQ) - { - syslog(LOG_DEBUG, "Thread %s (0x%08lx) refused stop request.\n", CT->name, CT->tid); - CT->state = CTDL_THREAD_RUNNING; - } - citthread_mutex_unlock(&CT->ThreadMutex); - } - - ctdl_thread_internal_change_state(CT, CTDL_THREAD_RUNNING); - - return ret; -} - - - -void *new_worker_thread(void *arg); -extern void close_masters (void); - - - void go_threading(void) { - int i; - CtdlThreadNode *last_worker; - - /* - * Initialise the thread system - */ - ctdl_thread_internal_init(); + if (pthread_key_create(&ThreadKey, NULL) != 0) { + syslog(LOG_EMERG, "pthread_key_create() : %s", strerror(errno)); + abort(); + } /* Second call to module init functions now that threading is up */ initialise_modules(1); - CtdlThreadCreate("select_on_master", CTDLTHREAD_BIGSTACK, select_on_master, NULL); - /* - * This thread is now used for garbage collection of other threads in the thread list - */ - syslog(LOG_INFO, "Startup thread %ld becoming garbage collector,\n", (long) citthread_self()); + CtdlThreadCreate(select_on_master); - /* - * We do a lot of locking and unlocking of the thread list in here. - * We do this so that we can repeatedly release time for other threads - * that may be waiting on the thread list. - * We are a low priority thread so we can afford to do this - */ - - while (CtdlThreadGetCount()) - { - if (CT->signal) - exit_signal = CT->signal; - if (exit_signal) - { - CtdlThreadStopAll(); - } - check_sched_shutdown(); - if (CT->state > CTDL_THREAD_STOP_REQ) - { - begin_critical_section(S_THREAD_LIST); - end_critical_section(S_THREAD_LIST); - - ctdl_thread_internal_check_scheduled(); /* start scheduled threads */ - } - - /* Reduce the size of the worker thread pool if necessary. */ - if ((CtdlThreadGetWorkers() > config.c_min_workers + 1) && (CtdlThreadWorkerAvg < 20) && (CT->state > CTDL_THREAD_STOP_REQ)) - { - /* Ask a worker thread to stop as we no longer need it */ - begin_critical_section(S_THREAD_LIST); - last_worker = CtdlThreadList; - while (last_worker) - { - citthread_mutex_lock(&last_worker->ThreadMutex); - if (last_worker->flags & CTDLTHREAD_WORKER && (last_worker->state > CTDL_THREAD_STOPPING) && (last_worker->Context == NULL)) - { - citthread_mutex_unlock(&last_worker->ThreadMutex); - break; - } - citthread_mutex_unlock(&last_worker->ThreadMutex); - last_worker = last_worker->next; - } - end_critical_section(S_THREAD_LIST); - if (last_worker) - { -#ifdef WITH_THREADLOG - syslog(LOG_DEBUG, "Thread system, stopping excess worker thread \"%s\" (0x%08lx).\n", - last_worker->name, - last_worker->tid - ); -#endif - CtdlThreadStop(last_worker); - } - } - - /* - * If all our workers are working hard, start some more to help out - * with things - */ - /* FIXME: come up with a better way to dynamically alter the number of threads - * based on the system load - */ - if ( (((CtdlThreadGetWorkers() < config.c_max_workers) - && (CtdlThreadGetWorkerAvg() > 60)) - || CtdlThreadGetWorkers() < config.c_min_workers) - && (CT->state > CTDL_THREAD_STOP_REQ) - ) - { - /* Only start new threads if we are not going to overload the machine */ - /* Temporarily set to 10 should be enough to make sure we don't stranglew the server - * at least until we make this a config option */ - if (CtdlThreadGetLoadAvg() < ((double)10.00)) { - for (i=0; i<5 ; i++) { - CtdlThreadCreate("Worker Thread", - CTDLTHREAD_BIGSTACK + CTDLTHREAD_WORKER, - worker_thread, - NULL - ); - } - } - else - syslog(LOG_WARNING, "Server strangled due to machine load average too high.\n"); - } + /* FIXME temporary fixed size pool of worker threads */ + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); + CtdlThreadCreate(worker_thread); - CtdlThreadGC(); + /* At this point I am a union worker and therefore serve no useful purpose. */ - if (CtdlThreadGetCount() <= 1) // Shutting down clean up the garbage collector - { - CtdlThreadGC(); - } - -#ifdef THREADS_USESIGNALS - if (CtdlThreadGetCount() && CT->state > CTDL_THREAD_STOP_REQ) -#else - if (CtdlThreadGetCount()) -#endif - CtdlThreadSleep(1); + while(!CtdlThreadCheckStop()) { + sleep(3); } - /* - * If the above loop exits we must be shutting down since we obviously have no threads - */ - ctdl_thread_internal_cleanup(); -} - - - + /* Shut down */ + CtdlThreadStopAll(); + exit(0); +} diff --git a/citadel/threads.h b/citadel/threads.h index f6fa812bf..d080b81e7 100644 --- a/citadel/threads.h +++ b/citadel/threads.h @@ -22,127 +22,25 @@ #include "server.h" #include "sysdep_decls.h" -#ifndef timerclear -#define timerclear(tvp) ((tvp)->tv_sec = (tvp)->tv_usec = 0) -#endif - -#ifndef timerisset -#define timerisset(tvp) ((tvp)->tv_sec || (tvp)->tv_usec) -#endif - -#ifndef timercmp -#define timercmp(tvp, uvp, cmp) \ - (((tvp)->tv_sec == (uvp)->tv_sec) ? \ - ((tvp)->tv_usec cmp (uvp)->tv_usec) : \ - ((tvp)->tv_sec cmp (uvp)->tv_sec)) -#endif - -#ifndef timeradd -#define timeradd(tvp, uvp, vvp) \ - do { \ - (vvp)->tv_sec = (tvp)->tv_sec + (uvp)->tv_sec; \ - (vvp)->tv_usec = (tvp)->tv_usec + (uvp)->tv_usec; \ - if ((vvp)->tv_usec >= 1000000) { \ - (vvp)->tv_sec++; \ - (vvp)->tv_usec -= 1000000; \ - } \ - } while (0) -#endif - -#ifndef timersub -#define timersub(tvp, uvp, vvp) \ - do { \ - (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \ - (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \ - if ((vvp)->tv_usec < 0) { \ - (vvp)->tv_sec--; \ - (vvp)->tv_usec += 1000000; \ - } \ - } while (0) -#endif - - -// #define THREADS_USESIGNALS - /* - * Thread stuff + * Things we need to keep track of per-thread instead of per-session */ - -enum CtdlThreadState { - CTDL_THREAD_INVALID, - CTDL_THREAD_VALID, - CTDL_THREAD_CREATE, - CTDL_THREAD_CANCELLED, - CTDL_THREAD_EXITED, - CTDL_THREAD_STOPPING, - CTDL_THREAD_STOP_REQ, /* Do NOT put any running states before this state */ - CTDL_THREAD_SLEEPING, - CTDL_THREAD_BLOCKED, - CTDL_THREAD_RUNNING, - CTDL_THREAD_LAST_STATE +struct thread_tsd { + DB_TXN *tid; /* Transaction handle */ + DBC *cursors[MAXCDB]; /* Cursors, for traversals... */ }; -typedef struct CtdlThreadNode CtdlThreadNode; - -struct CtdlThreadNode{ - citthread_t tid; /* id as returned by citthread_create() */ - pid_t pid; /* pid, as best the OS will let us determine */ - long reltid; /* counting from start... */ - time_t when; /* When to start a scheduled thread */ - struct CitContext *Context; /* The session context that this thread mught be working on or NULL if none */ - long number; /* A unigue number for this thread (not implimented yet) */ - int wakefd_recv; /* An fd that this thread can sleep on (not implimented yet) */ - int wakefd_send; /* An fd that this thread can send out on (Not implimented yet) */ - int signal; /* A field to store a signal we caught. */ - const char *name; /* A name for this thread */ - void *(*thread_func) (void *arg); /* The actual function that does this threads work */ - void *user_args; /* Arguments passed to this threads work function */ - long flags; /* Flags that describe this thread */ - enum CtdlThreadState state; /* Flag to show state of this thread */ - time_t stop_ticker; /* A counter to determine how long it has taken for this thread to exit */ - citthread_mutex_t ThreadMutex; /* A mutex to sync this thread to others if this thread allows (also used for sleeping) */ - citthread_cond_t ThreadCond; /* A condition variable to sync this thread with others */ - citthread_mutex_t SleepMutex; /* A mutex for sleeping */ - citthread_cond_t SleepCond; /* A condition variable for sleeping */ - citthread_attr_t attr; /* Attributes of this thread */ - struct timeval start_time; /* Time this thread was started */ - struct timeval last_state_change; /* Time when this thread last changed state */ - double avg_sleeping; /* Average sleeping time */ - double avg_running; /* Average running time */ - double avg_blocked; /* Average blocked time */ - double load_avg; /* Load average for this thread */ - CtdlThreadNode *prev; /* Previous thread in the thread table */ - CtdlThreadNode *next; /* Next thread in the thread table */ -} ; - -extern CtdlThreadNode *CtdlThreadList; - -typedef struct ThreadTSD ThreadTSD; - -struct ThreadTSD { - DB_TXN *tid; /* Transaction handle */ - DBC *cursors[MAXCDB]; /* Cursors, for traversals... */ - CtdlThreadNode *self; /* Pointer to this threads control structure */ -} ; - -extern double CtdlThreadLoadAvg; -extern double CtdlThreadWorkerAvg; -extern long statcount; /* are we doing a stats check? */ -extern citthread_key_t ThreadKey; -void ctdl_thread_internal_init_tsd(void); -void ctdl_internal_thread_gc (void); -void ctdl_thread_internal_init(void); -void ctdl_thread_internal_cleanup(void); -void ctdl_thread_internal_calc_loadavg(void); -void ctdl_thread_internal_free_tsd(void); -CtdlThreadNode *ctdl_internal_create_thread(char *name, long flags, void *(*thread_func) (void *arg), void *args); -void ctdl_thread_internal_check_scheduled(void); -void ctdl_thread_internal_change_state (CtdlThreadNode *this_thread, enum CtdlThreadState new_state); +extern struct thread_tsd masterTSD; +#define TSD MyThread() -void InitialiseSemaphores(void); +struct thread_tsd *MyThread(void); int try_critical_section (int which_one); void begin_critical_section (int which_one); void end_critical_section (int which_one); void go_threading(void); +int CtdlThreadCheckStop(void); +void CtdlThreadStopAll(void); +void InitializeMasterTSD(void); +void CtdlThreadCreate(void *(*start_routine)(void*)); #endif // THREADS_H