- we use a long value, use LKEY everywhere
- if the lookup of the HashPos failed, don't delete the first entry
- use the payload message ID as persistant identifier; the QueueMessageID is subject to be changed after each successfull delivery.
- add some more logging
int nRemain;
StrBuf *MsgData;
AsyncIO *IO = &Msg->IO;
int nRemain;
StrBuf *MsgData;
AsyncIO *IO = &Msg->IO;
- EV_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__);
+ EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__);
IDestructQueItem = DecreaseQReference(Msg->MyQItem);
IDestructQueItem = DecreaseQReference(Msg->MyQItem);
SubC->session_specific_data = (char*) SendMsg;
SendMsg->IO.CitContext = SubC;
SubC->session_specific_data = (char*) SendMsg;
SendMsg->IO.CitContext = SubC;
+ syslog(LOG_DEBUG, "SMTP Starting: [%ld] \n",
+ SendMsg->MyQItem->MessageID,
+ ChrPtr(SendMsg->MyQEntry->Recipient));
if (SendMsg->pCurrRelay == NULL)
QueueEventContext(&SendMsg->IO,
resolve_mx_records);
if (SendMsg->pCurrRelay == NULL)
QueueEventContext(&SendMsg->IO,
resolve_mx_records);
eNextState rc;
rc = ReadHandlers[pMsg->State](pMsg);
eNextState rc;
rc = ReadHandlers[pMsg->State](pMsg);
- pMsg->State++;
- SMTPSetTimeout(rc, pMsg);
+ if (rc != eAbort)
+ {
+ pMsg->State++;
+ SMTPSetTimeout(rc, pMsg);
+ }
return rc;
}
eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO)
return rc;
}
eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO)
void RemoveQItem(OneQueItem *MyQItem)
{
void RemoveQItem(OneQueItem *MyQItem)
{
+ long len;
+ const char* Key;
+ void *VData;
- It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
pthread_mutex_lock(&ActiveQItemsLock);
- {
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ It = GetNewHashPos(ActiveQItems, 0);
+ if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
DeleteEntryFromHash(ActiveQItems, It);
DeleteEntryFromHash(ActiveQItems, It);
+ else
+ {
+ syslog(LOG_WARNING,
+ "SMTP cleanup: unable to find QItem with ID[%ld]",
+ MyQItem->MessageID);
+ while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
+ syslog(LOG_WARNING,
+ "SMTP cleanup: have_: ID[%ld]",
+ ((OneQueItem *)VData)->MessageID);
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
Item->MessageID = -1;
Item->QueMsgID = QueMsgID;
Item->MessageID = -1;
Item->QueMsgID = QueMsgID;
- pthread_mutex_lock(&ActiveQItemsLock);
- if (GetHash(ActiveQItems,
- IKEY(QueMsgID),
- &v))
- {
- /* WHOOPS. somebody else is already working on this. */
- pthread_mutex_unlock(&ActiveQItemsLock);
- FreeQueItem(&Item);
- return NULL;
- }
- else {
- /* mark our claim on this. */
- Put(ActiveQItems,
- IKEY(Item->QueMsgID),
- Item,
- HFreeQueItem);
- pthread_mutex_unlock(&ActiveQItemsLock);
- }
-
Token = NewStrBuf();
Line = NewStrBufPlain(NULL, 128);
while (pLine != StrBufNOTNULL) {
Token = NewStrBuf();
Line = NewStrBufPlain(NULL, 128);
while (pLine != StrBufNOTNULL) {
}
FreeStrBuf(&Line);
FreeStrBuf(&Token);
}
FreeStrBuf(&Line);
FreeStrBuf(&Token);
+
+ pthread_mutex_lock(&ActiveQItemsLock);
+ if (GetHash(ActiveQItems,
+ LKEY(Item->MessageID),
+ &v))
+ {
+ /* WHOOPS. somebody else is already working on this. */
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ FreeQueItem(&Item);
+ return NULL;
+ }
+ else {
+ /* mark our claim on this. */
+ Put(ActiveQItems,
+ LKEY(Item->MessageID),
+ Item,
+ HFreeQueItem);
+ pthread_mutex_unlock(&ActiveQItemsLock);
+ }
+
void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos)
{
void QItem_Handle_MsgID(OneQueItem *Item, StrBuf *Line, const char **Pos)
{
- Item->MessageID = StrBufExtractNext_int(Line, Pos, '|');
+ Item->MessageID = StrBufExtractNext_long(Line, Pos, '|');
}
void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos)
}
void QItem_Handle_EnvelopeFrom(OneQueItem *Item, StrBuf *Line, const char **Pos)
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteEntryFromHash(ActiveQItems, It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
+ GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It);
DeleteEntryFromHash(ActiveQItems, It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteEntryFromHash(ActiveQItems, It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
int KeepBuffers = (i == m);
if (i > 1) n = MsgCount++;
syslog(LOG_DEBUG,
int KeepBuffers = (i == m);
if (i > 1) n = MsgCount++;
syslog(LOG_DEBUG,
- "SMTP Queue: Trying <%s> %d / %d \n",
+ "SMTP Queue: Trying <%ld> <%s> %d / %d \n",
+ MyQItem->MessageID,
ChrPtr(ThisItem->Recipient),
i,
m);
ChrPtr(ThisItem->Recipient),
i,
m);
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
It = GetNewHashPos(MyQItem->MailQEntries, 0);
pthread_mutex_lock(&ActiveQItemsLock);
{
- GetHashPosFromKey(ActiveQItems, IKEY(MyQItem->MessageID), It);
- DeleteEntryFromHash(ActiveQItems, It);
+ if (GetHashPosFromKey(ActiveQItems, LKEY(MyQItem->MessageID), It))
+ DeleteEntryFromHash(ActiveQItems, It);
+ else
+ {
+ long len;
+ const char* Key;
+ void *VData;
+ syslog(LOG_WARNING,
+ "SMTP cleanup: unable to find QItem with ID[%ld]",
+ MyQItem->MessageID);
+ while (GetNextHashPos(ActiveQItems, It, &len, &Key, &VData))
+ syslog(LOG_WARNING,
+ "SMTP cleanup: have: ID[%ld]",
+ ((OneQueItem *)VData)->MessageID);
+ }
+
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);
}
pthread_mutex_unlock(&ActiveQItemsLock);
DeleteHashPos(&It);