From 682137147d66ec1ea06b9099c55e597c23b0e31f Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 17 Aug 2011 17:13:59 +0000 Subject: [PATCH] Add async message reading - CtdlSubmitMsg(): make param const, since its just read - add NewAsyncMsg(): preprocess the parameters provided by application logic, return the worker struct for message reading - add CtdlReadMessageBodyAsync(): call me from your IO handler to read a message from the socket asynchroneous. - add DeleteAsyncMsg(): call me to delete the worker vars. --- citadel/msgbase.c | 119 +++++++++++++++++++++++++++++++++++++++++++++- citadel/msgbase.h | 24 +++++++++- 2 files changed, 140 insertions(+), 3 deletions(-) diff --git a/citadel/msgbase.c b/citadel/msgbase.c index 88c63a802..13e1b1258 100644 --- a/citadel/msgbase.c +++ b/citadel/msgbase.c @@ -2927,7 +2927,7 @@ void ReplicationChecks(struct CtdlMessage *msg) { */ long CtdlSubmitMsg(struct CtdlMessage *msg, /* message to save */ struct recptypes *recps, /* recipients (if mail) */ - char *force, /* force a particular room? */ + const char *force, /* force a particular room? */ int flags /* should the message be exported clean? */ ) { char submit_filename[128]; @@ -3479,6 +3479,123 @@ StrBuf *CtdlReadMessageBodyBuf(char *terminator, /* token signalling EOT */ return Message; } +void DeleteAsyncMsg(ReadAsyncMsg **Msg) +{ + if (*Msg == NULL) + return; + FreeStrBuf(&(*Msg)->MsgBuf); + + free(*Msg); + *Msg = NULL; +} + +ReadAsyncMsg *NewAsyncMsg(const char *terminator, /* token signalling EOT */ + long tlen, + size_t maxlen, /* maximum message length */ + size_t expectlen, /* if we expect a message, how long should it be? */ + char *exist, /* if non-null, append to it; + exist is ALWAYS freed */ + long eLen, /* length of exist */ + int crlf /* CRLF newlines instead of LF */ + ) +{ + ReadAsyncMsg *NewMsg; + + NewMsg = (ReadAsyncMsg *)malloc(sizeof(ReadAsyncMsg)); + memset(NewMsg, 0, sizeof(ReadAsyncMsg)); + + if (exist == NULL) { + long len; + + if (expectlen == 0) { + len = 4 * SIZ; + } + else { + len = expectlen + 10; + } + NewMsg->MsgBuf = NewStrBufPlain(NULL, len); + } + else { + NewMsg->MsgBuf = NewStrBufPlain(exist, eLen); + free(exist); + } + /* Do we need to change leading ".." to "." for SMTP escaping? */ + if ((tlen == 1) && (*terminator == '.')) { + NewMsg->dodot = 1; + } + + NewMsg->terminator = terminator; + NewMsg->tlen = tlen; + + NewMsg->maxlen = maxlen; + + NewMsg->crlf = crlf; + + return NewMsg; +} + +/* + * Back end function used by CtdlMakeMessage() and similar functions + */ +eReadState CtdlReadMessageBodyAsync(AsyncIO *IO) +{ + ReadAsyncMsg *ReadMsg; + int MsgFinished = 0; + eReadState Finished = eMustReadMore; + + ReadMsg = IO->ReadMsg; + + /* read in the lines of message text one by one */ + do { + Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + return Finished; + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + + + if ((StrLength(IO->IOBuf) == ReadMsg->tlen) && + (!strcmp(ChrPtr(IO->IOBuf), ReadMsg->terminator))) { + MsgFinished = 1; + } + else if (!ReadMsg->flushing) { + /* Unescape SMTP-style input of two dots at the beginning of the line */ + if ((ReadMsg->dodot) && + (StrLength(IO->IOBuf) == 2) && /* TODO: do we just unescape lines with two dots or any line? */ + (!strcmp(ChrPtr(IO->IOBuf), ".."))) + { + StrBufCutLeft(IO->IOBuf, 1); + } + + if (ReadMsg->crlf) { + StrBufAppendBufPlain(IO->IOBuf, HKEY("\r\n"), 0); + } + else { + StrBufAppendBufPlain(IO->IOBuf, HKEY("\n"), 0); + } + + StrBufAppendBuf(ReadMsg->MsgBuf, IO->IOBuf, 0); + } + + /* if we've hit the max msg length, flush the rest */ + if (StrLength(ReadMsg->MsgBuf) >= ReadMsg->maxlen) ReadMsg->flushing = 1; + + } while (!MsgFinished); + + if (MsgFinished) + return eReadSuccess; + else + return eAbort; +} + /* * Back end function used by CtdlMakeMessage() and similar functions diff --git a/citadel/msgbase.h b/citadel/msgbase.h index b2a8bccbd..20c6ee76e 100644 --- a/citadel/msgbase.h +++ b/citadel/msgbase.h @@ -2,7 +2,7 @@ #ifndef MSGBASE_H #define MSGBASE_H - +#include "event_client.h" enum { MSGS_ALL, MSGS_OLD, @@ -107,7 +107,7 @@ void cmd_opna (char *cmdbuf); void cmd_dlat (char *cmdbuf); long send_message (struct CtdlMessage *); void loadtroom (void); -long CtdlSubmitMsg(struct CtdlMessage *, struct recptypes *, char *, int); +long CtdlSubmitMsg(struct CtdlMessage *, struct recptypes *, const char *, int); void quickie_message (const char *, const char *, char *, char *, const char *, int, const char *); void cmd_ent0 (char *entargs); void cmd_dele (char *delstr); @@ -237,5 +237,25 @@ int CtdlIsMe(char *addr, int addr_buf_len); void aide_message(char *text, char *subject) __attribute__ ((deprecated)); +/* + * loading messages async via an FD: + * add IO->ReadMsg = NewAsyncMsg(...) + * and then call CtdlReadMessageBodyAsync() from your linreader handler. + */ + +ReadAsyncMsg *NewAsyncMsg(const char *terminator, /* token signalling EOT */ + long tlen, + size_t expectlen, /* if we expect a message, how long should it be? */ + size_t maxlen, /* maximum message length */ + char *exist, /* if non-null, append to it; + exist is ALWAYS freed */ + long eLen, /* length of exist */ + int crlf /* CRLF newlines instead of LF */ + ); + +eReadState CtdlReadMessageBodyAsync(AsyncIO *IO); +void DeleteAsyncMsg(ReadAsyncMsg **Msg); + + #endif /* MSGBASE_H */ -- 2.30.2