#include "event_client.h"
-citthread_mutex_t ActiveQItemsLock;
+struct CitContext smtp_queue_CC;
+pthread_mutex_t ActiveQItemsLock;
HashList *ActiveQItems = NULL;
HashList *QItemHandlers = NULL;
void smtp_evq_cleanup(void)
{
- citthread_mutex_lock(&ActiveQItemsLock);
+
+ pthread_mutex_lock(&ActiveQItemsLock);
DeleteHash(&QItemHandlers);
DeleteHash(&ActiveQItems);
- citthread_mutex_unlock(&ActiveQItemsLock);
- citthread_mutex_destroy(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+/* citthread_mutex_destroy(&ActiveQItemsLock); TODO */
}
int DecreaseQReference(OneQueItem *MyQItem)
{
int IDestructQueItem;
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
MyQItem->ActiveDeliveries--;
IDestructQueItem = MyQItem->ActiveDeliveries == 0;
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
return IDestructQueItem;
}
void RemoveQItem(OneQueItem *MyQItem)
{
+ long len;
+ const char* Key;
+ void *VData;
HashPos *It;
- It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
- {
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ pthread_mutex_lock(&ActiveQItemsLock);
+ It = GetNewHashPos(ActiveQItems, 0);
+ if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
DeleteEntryFromHash(ActiveQItems, It);
- }
- citthread_mutex_unlock(&ActiveQItemsLock);
+ else
+ {
+ syslog(LOG_WARNING,
+ "SMTP cleanup: unable to find QItem with ID[%ld]",
+ MyQItem->MessageID);
+ while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
+ syslog(LOG_WARNING,
+ "SMTP cleanup: have_: ID[%ld]",
+ ((OneQueItem *)VData)->MessageID);
+ }
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
}
Item->MessageID = -1;
Item->QueMsgID = QueMsgID;
- citthread_mutex_lock(&ActiveQItemsLock);
- if (GetHash(ActiveQItems,
- IKEY(QueMsgID),
- &v))
- {
- /* WHOOPS. somebody else is already working on this. */
- citthread_mutex_unlock(&ActiveQItemsLock);
- FreeQueItem(&Item);
- return NULL;
- }
- else {
- /* mark our claim on this. */
- Put(ActiveQItems,
- IKEY(Item->QueMsgID),
- Item,
- HFreeQueItem);
- citthread_mutex_unlock(&ActiveQItemsLock);
- }
-
Token = NewStrBuf();
Line = NewStrBufPlain(NULL, 128);
while (pLine != StrBufNOTNULL) {
}
FreeStrBuf(&Line);
FreeStrBuf(&Token);
+
+ pthread_mutex_lock(&ActiveQItemsLock);
+ if (GetHash(ActiveQItems,
+ LKEY(Item->MessageID),
+ &v))
+ {
+ /* WHOOPS. somebody else is already working on this. */
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ FreeQueItem(&Item);
+ return NULL;
+ }
+ else {
+ /* mark our claim on this. */
+ Put(ActiveQItems,
+ LKEY(Item->MessageID),
+ Item,
+ HFreeQueItem);
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ }
+
return Item;
}
void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos)
{
- Item->MessageID = StrBufExtractNext_int(Line, Pos, '|');
+ Item->MessageID = StrBufExtractNext_long(Line, Pos, '|');
}
void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos)
syslog(LOG_DEBUG, "SMTP client: Retry time not yet reached.\n");
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
DeleteHashPos(&It);
return;
if (MyQItem->MessageID < 0L) {
syslog(LOG_ERR, "SMTP Queue: no 'msgid' directive found!\n");
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
return;
int KeepBuffers = (i == m);
if (i > 1) n = MsgCount++;
syslog(LOG_DEBUG,
- "SMTP Queue: Trying <%s> %d / %d \n",
+ "SMTP Queue: Trying <%ld> <%s> %d / %d \n",
+ MyQItem->MessageID,
ChrPtr(ThisItem->Recipient),
i,
m);
else
{
It = GetNewHashPos(MyQItem->MailQEntries, 0);
- citthread_mutex_lock(&ActiveQItemsLock);
+ pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
- DeleteEntryFromHash(ActiveQItems, It);
+ if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
+ DeleteEntryFromHash(ActiveQItems, It);
+ else
+ {
+ long len;
+ const char* Key;
+ void *VData;
+ syslog(LOG_WARNING,
+ "SMTP cleanup: unable to find QItem with ID[%ld]",
+ MyQItem->MessageID);
+ while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
+ syslog(LOG_WARNING,
+ "SMTP cleanup: have: ID[%ld]",
+ ((OneQueItem *)VData)->MessageID);
+ }
+
}
- citthread_mutex_unlock(&ActiveQItemsLock);
+ pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
////FreeQueItem(&MyQItem); TODO: DeleteEntryFromHash frees this?
*
* Run through the queue sending out messages.
*/
-void *smtp_queue_thread(void *arg) {
+void smtp_do_queue(void) {
+ static int is_running = 0;
int num_processed = 0;
- struct CitContext smtp_queue_CC;
-
- CtdlThreadSleep(10);
- CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
- citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
- syslog(LOG_DEBUG, "smtp_queue_thread() initializing\n");
+ if (is_running) return; /* Concurrency check - only one can run */
+ is_running = 1;
- while (!CtdlThreadCheckStop()) {
-
- syslog(LOG_INFO, "SMTP client: processing outbound queue\n");
+ pthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
+ syslog(LOG_INFO, "SMTP client: processing outbound queue");
- if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
- syslog(LOG_ERR, "Cannot find room <%s>\n", SMTP_SPOOLOUT_ROOM);
- }
- else {
- num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL);
- }
- syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed\n", num_processed);
- CtdlThreadSleep(60);
+ if (CtdlGetRoom(&CC->room, SMTP_SPOOLOUT_ROOM) != 0) {
+ syslog(LOG_ERR, "Cannot find room <%s>", SMTP_SPOOLOUT_ROOM);
}
-
- CtdlClearSystemContext();
- return(NULL);
+ else {
+ num_processed = CtdlForEachMessage(MSGS_ALL, 0L, NULL, SPOOLMIME, NULL, smtp_do_procmsg, NULL);
+ }
+ syslog(LOG_INFO, "SMTP client: queue run completed; %d messages processed", num_processed);
+ run_queue_now = 0;
+ is_running = 0;
}
}
-
-
CTDL_MODULE_INIT(smtp_queu)
{
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
if (!threading)
{
+ CtdlFillSystemContext(&smtp_queue_CC, "SMTP_Send");
ActiveQItems = NewHash(1, lFlathash);
- citthread_mutex_init(&ActiveQItemsLock, NULL);
+ pthread_mutex_init(&ActiveQItemsLock, NULL);
QItemHandlers = NewHash(0, NULL);
smtp_init_spoolout();
CtdlRegisterCleanupHook(smtp_evq_cleanup);
- CtdlThreadCreate("SMTPEvent Send", CTDLTHREAD_BIGSTACK, smtp_queue_thread, NULL);
CtdlRegisterProtoHook(cmd_smtp, "SMTP", "SMTP utility commands");
+ CtdlRegisterSessionHook(smtp_do_queue, EVT_TIMER);
}
-#endif
/* return our Subversion id for the Log */
return "smtpeventclient";