- add a mutex for securely accessing the eventqueues from outside via ev_async_send() (s.b. might want to shut down meanwhile)
- remove unused add-pipes we used for signaling before switching to libev4
- start the shutdown handlers only from within the eventqueue
*----------------------------------------------------------------------------*/
extern int evdb_count;
extern pthread_mutex_t DBEventQueueMutex;
+extern pthread_mutex_t DBEventExitQueueMutex;
extern HashList *DBInboundEventQueue;
extern struct ev_loop *event_db;
extern ev_async DBAddJob;
ev_cleanup_init(&IO->db_abort_by_shutdown,
IO_abort_shutdown_callback);
IO->db_abort_by_shutdown.data = IO;
- ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
pthread_mutex_lock(&DBEventQueueMutex);
+ if (DBInboundEventQueue == NULL)
+ {
+ /* shutting down... */
+ free(h);
+ EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
+ pthread_mutex_unlock(&DBEventQueueMutex);
+ return eAbort;
+ }
EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
i = ++evdb_count ;
Put(DBInboundEventQueue, IKEY(i), h, NULL);
pthread_mutex_unlock(&DBEventQueueMutex);
+ pthread_mutex_lock(&DBEventExitQueueMutex);
+ if (event_db == NULL)
+ {
+ pthread_mutex_unlock(&DBEventExitQueueMutex);
+ return eAbort;
+ }
ev_async_send (event_db, &DBAddJob);
+ pthread_mutex_unlock(&DBEventExitQueueMutex);
+
EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
return eDBQuery;
}
+void StopDBWatchers(AsyncIO *IO)
+{
+ ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
+ ev_idle_stop(event_db, &IO->db_unwind_stack);
+}
+
void ShutDownDBCLient(AsyncIO *IO)
{
CitContext *Ctx =IO->CitContext;
become_session(Ctx);
EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
- ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
+ StopDBWatchers(IO);
assert(IO->DBTerminate);
IO->DBTerminate(IO);
*----------------------------------------------------------------------------*/
extern int evbase_count;
extern pthread_mutex_t EventQueueMutex;
+extern pthread_mutex_t EventExitQueueMutex;
extern HashList *InboundEventQueue;
extern struct ev_loop *event_base;
extern ev_async AddJob;
ev_cleanup_init(&IO->abort_by_shutdown,
IO_abort_shutdown_callback);
IO->abort_by_shutdown.data = IO;
- ev_cleanup_start(event_base, &IO->abort_by_shutdown);
pthread_mutex_lock(&EventQueueMutex);
+ if (InboundEventQueue == NULL)
+ {
+ free(h);
+ /* shutting down... */
+ EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
+ pthread_mutex_unlock(&EventQueueMutex);
+ return eAbort;
+ }
EVM_syslog(LOG_DEBUG, "EVENT Q\n");
i = ++evbase_count;
Put(InboundEventQueue, IKEY(i), h, NULL);
pthread_mutex_unlock(&EventQueueMutex);
+ pthread_mutex_lock(&EventExitQueueMutex);
+ if (event_base == NULL) {
+ pthread_mutex_unlock(&EventExitQueueMutex);
+ return eAbort;
+ }
ev_async_send (event_base, &AddJob);
+ pthread_mutex_unlock(&EventExitQueueMutex);
EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
return eSendReply;
}
h->EvAttch = evcurl_handle_start;
pthread_mutex_lock(&EventQueueMutex);
+ if (InboundEventQueue == NULL)
+ {
+ /* shutting down... */
+ free(h);
+ EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
+ pthread_mutex_unlock(&EventQueueMutex);
+ return eAbort;
+ }
+
EVM_syslog(LOG_DEBUG, "EVENT Q\n");
i = ++evbase_count;
Put(InboundEventQueue, IKEY(i), h, NULL);
pthread_mutex_unlock(&EventQueueMutex);
+ pthread_mutex_lock(&EventExitQueueMutex);
+ if (event_base == NULL) {
+ pthread_mutex_unlock(&EventExitQueueMutex);
+ return eAbort;
+ }
ev_async_send (event_base, &AddJob);
+ pthread_mutex_unlock(&EventExitQueueMutex);
+
EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
return eSendReply;
}
ev_timer_stop (event_base, &IO->rw_timeout);
ev_timer_stop(event_base, &IO->conn_fail);
ev_idle_stop(event_base, &IO->unwind_stack);
+ ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
+
+ ev_io_stop(event_base, &IO->conn_event);
+ ev_io_stop(event_base, &IO->send_event);
+ ev_io_stop(event_base, &IO->recv_event);
+
+ if (IO->SendBuf.fd != 0) {
+ close(IO->SendBuf.fd);
+ }
+ IO->SendBuf.fd = 0;
+ IO->RecvBuf.fd = 0;
+}
+
+void StopCurlWatchers(AsyncIO *IO)
+{
+ ev_timer_stop (event_base, &IO->rw_timeout);
+ ev_timer_stop(event_base, &IO->conn_fail);
+ ev_idle_stop(event_base, &IO->unwind_stack);
+ ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
ev_io_stop(event_base, &IO->conn_event);
ev_io_stop(event_base, &IO->send_event);
EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
- ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
StopClientWatchers(IO);
if (IO->DNS.Channel != NULL) {
eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB);
eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB);
+void StopDBWatchers(AsyncIO *IO);
eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB);
eNextState QueueCurlContext(AsyncIO *IO);
AsyncIO *IO = data;
EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__);
- EV_DNS_LOGT_STOP(DNS.timeout);
+ EV_DNS_LOGT_STOP(DNS.timeout);
ev_timer_stop (event_base, &IO->DNS.timeout);
IO->DNS.Query->DNSStatus = status;
AsyncIO *IO = arg;
EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__);
- EV_DNS_LOGT_STOP(DNS.timeout);
+ EV_DNS_LOGT_STOP(DNS.timeout);
ev_timer_stop (event_base, &IO->DNS.timeout);
IO->DNS.Query->DNSStatus = status;
void QueryCbDone(AsyncIO *IO)
{
EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__);
+
EV_DNS_LOGT_STOP(DNS.timeout);
+ ev_timer_stop (event_base, &IO->DNS.timeout);
+ EV_DNS_LOGT_STOP(unwind_stack);
ev_idle_stop(event_base, &IO->unwind_stack);
}
void DestructCAres(AsyncIO *IO)
{
EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__);
- EV_DNS_LOGT_STOP(DNS.timeout);
EV_DNS_LOG_STOP(DNS.recv_event);
ev_io_stop(event_base, &IO->DNS.recv_event);
+
EV_DNS_LOG_STOP(DNS.send_event);
ev_io_stop(event_base, &IO->DNS.send_event);
+
+ EV_DNS_LOGT_STOP(DNS.timeout);
ev_timer_stop (event_base, &IO->DNS.timeout);
+
+ EV_DNS_LOGT_STOP(unwind_stack);
ev_idle_stop(event_base, &IO->unwind_stack);
ares_destroy_options(&IO->DNS.Options);
}
IO->DNS.Query->VParsedDNSReply = hostent;
IO->DNS.Query->DNSReplyFree = (FreeDNSReply) ares_free_hostent;
+ EV_DNS_LOGT_STOP(DNS.timeout);
+ ev_timer_stop (event_base, &IO->DNS.timeout);
+
ev_idle_init(&IO->unwind_stack,
IO_postdns_callback);
IO->unwind_stack.data = IO;
EV_DNS_LOGT_INIT(unwind_stack);
EV_DNS_LOGT_START(unwind_stack);
ev_idle_start(event_base, &IO->unwind_stack);
- ev_timer_stop (event_base, &IO->DNS.timeout);
+
}
void QueueGetHostByName(AsyncIO *IO,
continue;
}
IO = (AsyncIO *)chandle;
+ if (IO->ID == 0) {
+ EVCURL_syslog(LOG_ERR,
+ "Error, invalid IO context %p\n",
+ IO);
+ continue;
+ }
EVCURLM_syslog(LOG_DEBUG, "request complete\n");
"error asking curl for "
"response code from request: %s\n",
curl_easy_strerror(sta));
- EVCURL_syslog(LOG_ERR,
+ EVCURL_syslog(LOG_DEBUG,
"http response code was %ld\n",
(long)IO->HttpReq.httpcode);
"%s\n",
curl_multi_strerror(msta));
- ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
+ ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
IO->HttpReq.attached = 0;
switch(IO->SendDone(IO))
{
CURLMcode msta;
AsyncIO *IO = watcher->data;
+
+ if (IO == NULL)
+ return;
IO->Now = ev_now(event_base);
EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
IO->HttpReq.attached = 1;
ev_async_send (event_base, &WakeupCurl);
+
ev_cleanup_init(&IO->abort_by_shutdown,
IOcurl_abort_shutdown_callback);
ev_cleanup_start(event_base, &IO->abort_by_shutdown);
+
return eReadMessage;
}
* this currently is the main loop (which may change in some future?)
*/
int evbase_count = 0;
-int event_add_pipe[2] = {-1, -1};
pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
+pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
HashList *QueueEvents = NULL;
HashList *InboundEventQueue = NULL;
HashList *InboundEventQueues[2] = { NULL, NULL };
}
if (h->IO->StartIO == 0.0)
h->IO->StartIO = Now;
+
h->IO->Now = Now;
h->EvAttch(h->IO);
}
void InitEventQueue(void)
{
- struct rlimit LimitSet;
-
pthread_mutex_init(&EventQueueMutex, NULL);
-
- if (pipe(event_add_pipe) != 0) {
- syslog(LOG_EMERG,
- "Unable to create pipe for libev queueing: %s\n",
- strerror(errno));
- abort();
- }
- LimitSet.rlim_cur = 1;
- LimitSet.rlim_max = 1;
- setrlimit(event_add_pipe[1], &LimitSet);
+ pthread_mutex_init(&EventExitQueueMutex, NULL);
QueueEvents = NewHash(1, Flathash);
InboundEventQueues[0] = NewHash(1, Flathash);
EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
///what todo here? CtdlClearSystemContext();
+ pthread_mutex_lock(&EventExitQueueMutex);
ev_loop_destroy (EV_DEFAULT_UC);
+ event_base = NULL;
DeleteHash(&QueueEvents);
InboundEventQueue = NULL;
DeleteHash(&InboundEventQueues[0]);
DeleteHash(&InboundEventQueues[1]);
/* citthread_mutex_destroy(&EventQueueMutex); TODO */
evcurl_shutdown();
- close(event_add_pipe[0]);
- close(event_add_pipe[1]);
CtdlDestroyEVCleanupHooks();
- EVQShutDown = 1;
+ pthread_mutex_unlock(&EventExitQueueMutex);
+ EVQShutDown = 1;
return(NULL);
}
*/
ev_loop *event_db;
int evdb_count = 0;
-int evdb_add_pipe[2] = {-1, -1};
pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
+pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
HashList *DBQueueEvents = NULL;
HashList *DBInboundEventQueue = NULL;
HashList *DBInboundEventQueues[2] = { NULL, NULL };
if (h->IO->StartDB == 0.0)
h->IO->StartDB = Now;
h->IO->Now = Now;
+ ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
rc = h->EvAttch(h->IO);
switch (rc)
{
void DBInitEventQueue(void)
{
- struct rlimit LimitSet;
-
pthread_mutex_init(&DBEventQueueMutex, NULL);
-
- if (pipe(evdb_add_pipe) != 0) {
- EVQ_syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno));
- abort();
- }
- LimitSet.rlim_cur = 1;
- LimitSet.rlim_max = 1;
- setrlimit(evdb_add_pipe[1], &LimitSet);
+ pthread_mutex_init(&DBEventExitQueueMutex, NULL);
DBQueueEvents = NewHash(1, Flathash);
DBInboundEventQueues[0] = NewHash(1, Flathash);
*/
void *db_event_thread(void *arg)
{
+ ev_loop *tmp;
struct CitContext libev_msg_CC;
CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
-// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+
EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
- event_db = ev_loop_new (EVFLAG_AUTO);
+ tmp = event_db = ev_loop_new (EVFLAG_AUTO);
ev_async_init(&DBAddJob, DBQueueEventAddCallback);
ev_async_start(event_db, &DBAddJob);
ev_run (event_db, 0);
- EVQM_syslog(LOG_DEBUG, "dbevent_thread() exiting\n");
+ pthread_mutex_lock(&DBEventExitQueueMutex);
-//// what to do here? CtdlClearSystemContext();
- ev_loop_destroy (event_db);
+ event_db = NULL;
+ EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
DeleteHash(&DBQueueEvents);
DBInboundEventQueue = NULL;
DeleteHash(&DBInboundEventQueues[0]);
DeleteHash(&DBInboundEventQueues[1]);
- close(evdb_add_pipe[0]);
- close(evdb_add_pipe[1]);
/* citthread_mutex_destroy(&DBEventQueueMutex); TODO */
+ ev_loop_destroy (tmp);
+ pthread_mutex_unlock(&DBEventExitQueueMutex);
return(NULL);
}
}
FreeAsyncIOContents(&RSSAggr->IO);
+ memset(RSSAggr, 0, sizeof(rss_aggregator));
free(RSSAggr);
}
EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
-
+ StopCurlWatchers(IO);
UnlinkRSSAggregator(RSSAggr);
return eAbort;
}
EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
+ StopDBWatchers(&RSSAggr->IO);
UnlinkRSSAggregator(RSSAggr);
return eAbort;
}
return eAbort;
}
-
-eNextState AbortNetworkSaveMessage (AsyncIO *IO)
-{
- return eAbort; ///TODO
-}
-
eNextState RSSSaveMessage(AsyncIO *IO)
{
long len;