From 40170e6219348ea235429a4c830b54bc8a16eed3 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 21 Nov 2011 19:41:29 +0100 Subject: [PATCH] fix removal from Queu - we use a long value, use LKEY everywhere - if the lookup of the HashPos failed, don't delete the first entry - use the payload message ID as persistant identifier; the QueueMessageID is subject to be changed after each successfull delivery. - add some more logging --- citadel/modules/smtp/serv_smtpeventclient.c | 12 ++- citadel/modules/smtp/serv_smtpqueue.c | 83 ++++++++++++++------- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index 9c51a1bc5..94c075be0 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -129,7 +129,7 @@ void FinalizeMessageSend(SmtpOutMsg *Msg) int nRemain; StrBuf *MsgData; AsyncIO *IO = &Msg->IO; - EV_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__); + EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__); IDestructQueItem = DecreaseQReference(Msg->MyQItem); @@ -486,6 +486,9 @@ void smtp_try_one_queue_entry(OneQueItem *MyQItem, SubC->session_specific_data = (char*) SendMsg; SendMsg->IO.CitContext = SubC; + syslog(LOG_DEBUG, "SMTP Starting: [%ld] \n", + SendMsg->MyQItem->MessageID, + ChrPtr(SendMsg->MyQEntry->Recipient)); if (SendMsg->pCurrRelay == NULL) QueueEventContext(&SendMsg->IO, resolve_mx_records); @@ -568,8 +571,11 @@ eNextState SMTP_C_DispatchReadDone(AsyncIO *IO) eNextState rc; rc = ReadHandlers[pMsg->State](pMsg); - pMsg->State++; - SMTPSetTimeout(rc, pMsg); + if (rc != eAbort) + { + pMsg->State++; + SMTPSetTimeout(rc, pMsg); + } return rc; } eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO) diff --git a/citadel/modules/smtp/serv_smtpqueue.c b/citadel/modules/smtp/serv_smtpqueue.c index 96e6ff1ea..5f22ee6fb 100644 --- a/citadel/modules/smtp/serv_smtpqueue.c +++ b/citadel/modules/smtp/serv_smtpqueue.c @@ -129,13 +129,24 @@ int DecreaseQReference(OneQueItem *MyQItem) void RemoveQItem(OneQueItem *MyQItem) { + long len; + const char* Key; + void *VData; HashPos *It; - It = GetNewHashPos(MyQItem->MailQEntries, 0); pthread_mutex_lock(&ActiveQItemsLock); - { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + It = GetNewHashPos(ActiveQItems, 0); + if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It)) DeleteEntryFromHash(ActiveQItems, It); + else + { + syslog(LOG_WARNING, + "SMTP cleanup: unable to find QItem with ID[%ld]", + MyQItem->MessageID); + while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData)) + syslog(LOG_WARNING, + "SMTP cleanup: have_: ID[%ld]", + ((OneQueItem *)VData)->MessageID); } pthread_mutex_unlock(&ActiveQItemsLock); DeleteHashPos(&It); @@ -208,25 +219,6 @@ OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) Item->MessageID = -1; Item->QueMsgID = QueMsgID; - pthread_mutex_lock(&ActiveQItemsLock); - if (GetHash(ActiveQItems, - IKEY(QueMsgID), - &v)) - { - /* WHOOPS. somebody else is already working on this. */ - pthread_mutex_unlock(&ActiveQItemsLock); - FreeQueItem(&Item); - return NULL; - } - else { - /* mark our claim on this. */ - Put(ActiveQItems, - IKEY(Item->QueMsgID), - Item, - HFreeQueItem); - pthread_mutex_unlock(&ActiveQItemsLock); - } - Token = NewStrBuf(); Line = NewStrBufPlain(NULL, 128); while (pLine != StrBufNOTNULL) { @@ -245,6 +237,26 @@ OneQueItem *DeserializeQueueItem(StrBuf *RawQItem, long QueMsgID) } FreeStrBuf(&Line); FreeStrBuf(&Token); + + pthread_mutex_lock(&ActiveQItemsLock); + if (GetHash(ActiveQItems, + LKEY(Item->MessageID), + &v)) + { + /* WHOOPS. somebody else is already working on this. */ + pthread_mutex_unlock(&ActiveQItemsLock); + FreeQueItem(&Item); + return NULL; + } + else { + /* mark our claim on this. */ + Put(ActiveQItems, + LKEY(Item->MessageID), + Item, + HFreeQueItem); + pthread_mutex_unlock(&ActiveQItemsLock); + } + return Item; } @@ -325,7 +337,7 @@ void NewMailQEntry(OneQueItem *Item) void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos) { - Item->MessageID = StrBufExtractNext_int(Line, Pos, '|'); + Item->MessageID = StrBufExtractNext_long(Line, Pos, '|'); } void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos) @@ -659,7 +671,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) { It = GetNewHashPos(MyQItem->MailQEntries, 0); pthread_mutex_lock(&ActiveQItemsLock); { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } pthread_mutex_unlock(&ActiveQItemsLock); @@ -676,7 +688,7 @@ void smtp_do_procmsg(long msgnum, void *userdata) { It = GetNewHashPos(MyQItem->MailQEntries, 0); pthread_mutex_lock(&ActiveQItemsLock); { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); + GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It); DeleteEntryFromHash(ActiveQItems, It); } pthread_mutex_unlock(&ActiveQItemsLock); @@ -755,7 +767,8 @@ void smtp_do_procmsg(long msgnum, void *userdata) { int KeepBuffers = (i == m); if (i > 1) n = MsgCount++; syslog(LOG_DEBUG, - "SMTP Queue: Trying <%s> %d / %d \n", + "SMTP Queue: Trying <%ld> <%s> %d / %d \n", + MyQItem->MessageID, ChrPtr(ThisItem->Recipient), i, m); @@ -776,8 +789,22 @@ void smtp_do_procmsg(long msgnum, void *userdata) { It = GetNewHashPos(MyQItem->MailQEntries, 0); pthread_mutex_lock(&ActiveQItemsLock); { - GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It); - DeleteEntryFromHash(ActiveQItems, It); + if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It)) + DeleteEntryFromHash(ActiveQItems, It); + else + { + long len; + const char* Key; + void *VData; + syslog(LOG_WARNING, + "SMTP cleanup: unable to find QItem with ID[%ld]", + MyQItem->MessageID); + while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData)) + syslog(LOG_WARNING, + "SMTP cleanup: have: ID[%ld]", + ((OneQueItem *)VData)->MessageID); + } + } pthread_mutex_unlock(&ActiveQItemsLock); DeleteHashPos(&It); -- 2.30.2