/*
*
- * Copyright (c) 1998-2009 by the citadel.org team
+ * Copyright (c) 1998-2012 by the citadel.org team
*
* This program is open source software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
#include "event_client.h"
-static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents)
+static void IO_abort_shutdown_callback(struct ev_loop *loop,
+ ev_cleanup *watcher,
+ int revents)
{
AsyncIO *IO = watcher->data;
EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
}
-/*--------------------------------------------------------------------------------
- * Server DB IO
- */
+/*------------------------------------------------------------------------------
+ * Server DB IO
+ *----------------------------------------------------------------------------*/
extern int evdb_count;
extern pthread_mutex_t DBEventQueueMutex;
extern HashList *DBInboundEventQueue;
extern struct ev_loop *event_db;
-extern ev_async DBAddJob;
+extern ev_async DBAddJob;
extern ev_async DBExitEventLoop;
eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
h->IO = IO;
h->EvAttch = CB;
- ev_cleanup_init(&IO->db_abort_by_shutdown,
+ ev_cleanup_init(&IO->db_abort_by_shutdown,
IO_abort_shutdown_callback);
IO->db_abort_by_shutdown.data = IO;
ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
assert(IO->Terminate);
- IO->Terminate(IO);
-
- Ctx->state = CON_IDLE;
- Ctx->kill_me = 1;
+ IO->Terminate(IO);
}
void
AsyncIO *IO = watcher->data;
EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
become_session(IO->CitContext);
-
+
ev_idle_stop(event_db, &IO->db_unwind_stack);
assert(IO->NextDBOperation);
case eSendDNSQuery:
case eReadDNSReply:
case eConnect:
- case eSendReply:
+ case eSendReply:
case eSendMore:
case eSendFile:
- case eReadMessage:
+ case eReadMessage:
case eReadMore:
case eReadPayload:
case eReadFile:
return eDBQuery;
}
-/*--------------------------------------------------------------------------------
- * Client IO
- */
+/*------------------------------------------------------------------------------
+ * Client IO
+ *----------------------------------------------------------------------------*/
extern int evbase_count;
extern pthread_mutex_t EventQueueMutex;
extern HashList *InboundEventQueue;
extern struct ev_loop *event_base;
-extern ev_async AddJob;
+extern ev_async AddJob;
extern ev_async ExitEventLoop;
h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
h->IO = IO;
h->EvAttch = CB;
- ev_cleanup_init(&IO->abort_by_shutdown,
+ ev_cleanup_init(&IO->abort_by_shutdown,
IO_abort_shutdown_callback);
IO->abort_by_shutdown.data = IO;
ev_cleanup_start(event_base, &IO->abort_by_shutdown);
return eSendReply;
}
+void DestructCAres(AsyncIO *IO);
void FreeAsyncIOContents(AsyncIO *IO)
{
+ CitContext *Ctx = IO->CitContext;
+
FreeStrBuf(&IO->IOBuf);
FreeStrBuf(&IO->SendBuf.Buf);
FreeStrBuf(&IO->RecvBuf.Buf);
+
+ DestructCAres(IO);
+
+ FreeURL(&IO->ConnectMe);
+ FreeStrBuf(&IO->HttpReq.ReplyData);
+
+ if (Ctx) {
+ Ctx->state = CON_IDLE;
+ Ctx->kill_me = 1;
+ }
}
void StopClientWatchers(AsyncIO *IO)
{
+ ev_timer_stop (event_base, &IO->rw_timeout);
ev_timer_stop(event_base, &IO->conn_fail);
- ev_io_stop(event_base, &IO->conn_event);
ev_idle_stop(event_base, &IO->unwind_stack);
+ ev_io_stop(event_base, &IO->conn_event);
ev_io_stop(event_base, &IO->send_event);
ev_io_stop(event_base, &IO->recv_event);
- ev_timer_stop (event_base, &IO->rw_timeout);
close(IO->SendBuf.fd);
IO->SendBuf.fd = 0;
IO->RecvBuf.fd = 0;
}
assert(IO->Terminate);
IO->Terminate(IO);
- Ctx->state = CON_IDLE;
- Ctx->kill_me = 1;
}
-
eReadState HandleInbound(AsyncIO *IO)
{
const char *Err = NULL;
eReadState Finished = eBufferNotEmpty;
-
+
become_session(IO->CitContext);
- while ((Finished == eBufferNotEmpty) &&
+ while ((Finished == eBufferNotEmpty) &&
((IO->NextState == eReadMessage)||
(IO->NextState == eReadMore)||
(IO->NextState == eReadFile)||
(IO->NextState == eReadPayload)))
{
- if (IO->RecvBuf.nBlobBytesWanted != 0) {
-
- }
- else { /* Reading lines... */
-//// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
- if ((IO->NextState == eReadFile) &&
- (Finished == eBufferNotEmpty))
+ /* Reading lines...
+ * lex line reply in callback,
+ * or do it ourselves.
+ * i.e. as nnn-blabla means continue reading in SMTP
+ */
+ if ((IO->NextState == eReadFile) &&
+ (Finished == eBufferNotEmpty))
+ {
+ Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+ if (Finished == eReadSuccess)
{
- Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
- if (Finished == eReadSuccess)
- {
- IO->NextState = eSendReply;
- }
- }
- else if (IO->LineReader)
- Finished = IO->LineReader(IO);
- else
- Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
-
- switch (Finished) {
- case eMustReadMore: /// read new from socket...
- break;
- case eBufferNotEmpty: /* shouldn't happen... */
- case eReadSuccess: /// done for now...
- break;
- case eReadFail: /// WHUT?
- ///todo: shut down!
- break;
+ IO->NextState = eSendReply;
}
-
}
-
+ else if (IO->LineReader)
+ Finished = IO->LineReader(IO);
+ else
+ Finished = StrBufChunkSipLine(IO->IOBuf,
+ &IO->RecvBuf);
+
+ switch (Finished) {
+ case eMustReadMore: /// read new from socket...
+ break;
+ case eBufferNotEmpty: /* shouldn't happen... */
+ case eReadSuccess: /// done for now...
+ break;
+ case eReadFail: /// WHUT?
+ ///todo: shut down!
+ break;
+ }
+
if (Finished != eMustReadMore) {
assert(IO->ReadDone);
ev_io_stop(event_base, &IO->recv_event);
const char *pch = ChrPtr(IO->SendBuf.Buf);
const char *pchh = IO->SendBuf.ReadWritePointer;
long nbytes;
-
+
if (pchh == NULL)
pchh = pch;
-
+
nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
((CitContext*)(IO->CitContext))->ServiceName,
IO->SendBuf.fd);
-
+
fd = fopen(fn, "a+");
fprintf(fd, "Send: BufSize: %ld BufContent: [",
nbytes);
StrBufPlain(IO->ErrMsg, errmsg, -1);
break;
default:
- rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+ rc = StrBuf_write_one_chunk_callback(watcher->fd,
+ 0/*TODO*/,
+ &IO->SendBuf);
}
#ifdef BIGBAD_IODBG
case eDBQuery:
case eConnect:
break;
- case eSendReply:
+ case eSendReply:
case eSendMore:
case eSendFile:
- ev_io_start(event_base, &IO->send_event);
+ ev_io_start(event_base,
+ &IO->send_event);
break;
- case eReadMessage:
+ case eReadMessage:
case eReadMore:
case eReadPayload:
case eReadFile:
}
break;
case eSendReply:
- if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
+ if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
break;
IO->NextState = eReadMore;
case eReadMore:
case eReadMessage:
case eReadPayload:
case eReadFile:
- if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
+ if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
+ {
HandleInbound(IO);
}
else {
break;
case eDBQuery:
- /* we now live in another queue, so we have to unregister. */
+ /*
+ * we now live in another queue,
+ * so we have to unregister.
+ */
ev_cleanup_stop(loop, &IO->abort_by_shutdown);
break;
case eSendDNSQuery:
static void
set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
{
-
switch(IO->NextState) {
case eReadMore:
case eReadMessage:
}
assert(IO->Timeout);
- switch (IO->Timeout(IO))
+ switch (IO->Timeout(IO))
{
case eAbort:
ShutDownCLient(IO);
become_session(IO->CitContext);
assert(IO->ConnFail);
- switch (IO->ConnFail(IO))
+ switch (IO->ConnFail(IO))
{
case eAbort:
ShutDownCLient(IO);
}
static void
-IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
+IO_connfailimmediate_callback(struct ev_loop *loop,
+ ev_idle *watcher,
+ int revents)
{
AsyncIO *IO = watcher->data;
become_session(IO->CitContext);
assert(IO->ConnFail);
- switch (IO->ConnFail(IO))
+ switch (IO->ConnFail(IO))
{
case eAbort:
ShutDownCLient(IO);
nbytes = FileRecvChunked(&IO->IOB, &errmsg);
if (nbytes < 0)
StrBufPlain(IO->ErrMsg, errmsg, -1);
- else
+ else
{
if (IO->IOB.ChunkSendRemain == 0)
{
}
break;
default:
- nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+ nbytes = StrBuf_read_one_chunk_callback(watcher->fd,
+ 0 /*TODO */,
+ &IO->RecvBuf);
break;
}
#ifdef BIGBAD_IODBG
{
+ long nbytes;
int rv = 0;
char fn [SIZ];
FILE *fd;
const char *pch = ChrPtr(IO->RecvBuf.Buf);
const char *pchh = IO->RecvBuf.ReadWritePointer;
- long nbytes;
-
+
if (pchh == NULL)
pchh = pch;
-
+
nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
- ((CitContext*)(IO->CitContext))->ServiceName,
+ ((CitContext*)(IO->CitContext))->ServiceName,
IO->SendBuf.fd);
-
+
fd = fopen(fn, "a+");
fprintf(fd, "Read: BufSize: %ld BufContent: [",
nbytes);
rv = fwrite(pchh, nbytes, 1, fd);
if (!rv) printf("failed to write debug to %s!\n", fn);
fprintf(fd, "]\n");
-
-
fclose(fd);
}
#endif
}
return;
} else if (nbytes == -1) {
-/// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno);
- EV_syslog(LOG_DEBUG,
+ // FD is gone. kick it.
+ StopClientWatchers(IO);
+ EV_syslog(LOG_DEBUG,
"EVENT: Socket Invalid! %s \n",
strerror(errno));
ShutDownCLient(IO);
IO->rw_timeout.data = IO;
- /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
-/// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1");
+ /* for debugging you may bypass it like this:
+ * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+ * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
+ * inet_addr("127.0.0.1");
+ */
if (IO->ConnectMe->IPv6)
rc = connect(IO->SendBuf.fd,
&IO->ConnectMe->Addr,