#include "event_client.h"
-citthread_mutex_t ActiveQItemsLock;
+struct CitContext smtp_queue_CC;
+pthread_mutex_t ActiveQItemsLock;
HashList *ActiveQItems = NULL;
HashList *QItemHandlers = NULL;
void smtp_evq_cleanup(void)
{
- citthread_mutex_lock(&ActiveQItemsLock);
+
+ pthread_mutex_lock(&ActiveQItemsLock);
DeleteHash(&QItemHandlers);
DeleteHash(&ActiveQItems);
- citthread_mutex_unlock(&ActiveQItemsLock);
- citthread_mutex_destroy(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+ CtdlClearSystemContext();
+/* citthread_mutex_destroy(&ActiveQItemsLock); TODO */
}
int DecreaseQReference(OneQueItem *MyQItem)
{
int IDestructQueItem;
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
MyQItem->ActiveDeliveries--;
IDestructQueItem = MyQItem->ActiveDeliveries == 0;
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
return IDestructQueItem;
}
HashPos *It;
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
}
Item->MessageID = -1;
Item->QueMsgID = QueMsgID;
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
if (GetHash(ActiveQItems,
IKEY(QueMsgID),
&v))
{
/* WHOOPS. somebody else is already working on this. */
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
FreeQueItem(&Item);
return NULL;
}
IKEY(Item->QueMsgID),
Item,
HFreeQueItem);
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
}
Token = NewStrBuf();
syslog(LOG_DEBUG, "SMTP client: Retry time not yet reached.\n");
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
DeleteHashPos(&It);
return;
if (MyQItem->MessageID < 0L) {
syslog(LOG_ERR, "SMTP Queue: no 'msgid' directive found!\n");
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
return;
else
{
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
*
* Run through the queue sending out messages.
*/
-void *smtp_queue_thread(void *arg) {
+void smtp_do_queue(void) {
+ static int is_running = 0;
int num_processed = 0;
- struct CitContext smtp_queue_CC;
- CtdlThreadSleep(10);
+ if (is_running) return; /* Concurrency check - only one can run */
+ is_running = 1;
- CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
- citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
- syslog(LOG_DEBUG, "smtp_queue_thread() initializing\n");
-
- while (!CtdlThreadCheckStop()) {
-
- syslog(LOG_INFO, "SMTP client: processing outbound queue\n");
+ pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+ syslog(LOG_INFO, "SMTP client: processing outbound queue");
- if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
- syslog(LOG_ERR, "Cannot find room <%s>\n", SMTP_SPOOLOUT_ROOM);
- }
- else {
- num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL);
- }
- syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed\n", num_processed);
- CtdlThreadSleep(60);
+ if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
+ syslog(LOG_ERR, "Cannot find room <%s>", SMTP_SPOOLOUT_ROOM);
}
-
- CtdlClearSystemContext();
- return(NULL);
+ else {
+ num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL);
+ }
+ syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed", num_processed);
+ run_queue_now = 0;
+ is_running = 0;
}
}
-
-
CTDL_MODULE_INIT(smtp_queu)
{
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
if (!threading)
{
+ CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
ActiveQItems = NewHash(1, lFlathash);
- citthread_mutex_init(&ActiveQItemsLock, NULL);
+ pthread_mutex_init(&ActiveQItemsLock, NULL);
QItemHandlers = NewHash(0, NULL);
smtp_init_spoolout();
CtdlRegisterCleanupHook(smtp_evq_cleanup);
- CtdlThreadCreate("SMTPEvent Send", CTDLTHREAD_BIGSTACK, smtp_queue_thread, NULL);
CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands");
}
-#endif
/* return our Subversion id for the Log */
return "smtpeventclient";