h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
h->IO = IO;
h->EvAttch = CB;
- ev_cleanup_init(&IO->abort_by_shutdown,
+ ev_cleanup_init(&IO->db_abort_by_shutdown,
IO_abort_shutdown_callback);
- IO->abort_by_shutdown.data = IO;
- ev_cleanup_start(event_db, &IO->abort_by_shutdown);
+ IO->db_abort_by_shutdown.data = IO;
+ ev_cleanup_start(event_db, &IO->db_abort_by_shutdown);
citthread_mutex_lock(&DBEventQueueMutex);
CtdlLogPrintf(CTDL_DEBUG, "DBEVENT Q\n");
become_session(Ctx);
CtdlLogPrintf(CTDL_DEBUG, "DBEVENT\n");
- ev_cleanup_stop(event_db, &IO->abort_by_shutdown);
+ ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
assert(IO->Terminate);
IO->Terminate(IO);
CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__);
become_session(IO->CitContext);
- ev_idle_stop(event_db, &IO->unwind_stack);
+ ev_idle_stop(event_db, &IO->db_unwind_stack);
- assert(IO->ReadDone);
- switch (IO->ReadDone(IO))
+ assert(IO->NextDBOperation);
+ switch (IO->NextDBOperation(IO))
{
+ case eDBQuery:
+ break;
+ case eSendDNSQuery:
+ case eReadDNSReply:
+ case eConnect:
+ case eSendReply:
+ case eSendMore:
+ case eReadMessage:
+ case eReadMore:
+ case eReadPayload:
+ ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
+ break;
+ case eTerminateConnection:
case eAbort:
ShutDownDBCLient(IO);
- default:
- break;
}
}
-void NextDBOperation(AsyncIO *IO, IO_CallBack CB)
+eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
{
- IO->ReadDone = CB;
- ev_idle_init(&IO->unwind_stack,
+ IO->NextDBOperation = CB;
+ ev_idle_init(&IO->db_unwind_stack,
DB_PerformNext);
- IO->unwind_stack.data = IO;
- ev_idle_start(event_db, &IO->unwind_stack);
+ IO->db_unwind_stack.data = IO;
+ ev_idle_start(event_db, &IO->db_unwind_stack);
+ return eDBQuery;
}
/*--------------------------------------------------------------------------------
become_session(IO->CitContext);
- while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){
+ while ((Finished == eBufferNotEmpty) &&
+ ((IO->NextState == eReadMessage)||
+ (IO->NextState == eReadMore)||
+ (IO->NextState == eReadPayload)))
+ {
if (IO->RecvBuf.nBlobBytesWanted != 0) {
}
}
}
-
- if ((IO->NextState == eSendReply) ||
- (IO->NextState == eSendMore))
- {
+ switch (IO->NextState) {
+ case eSendReply:
+ case eSendMore:
assert(IO->SendDone);
IO->NextState = IO->SendDone(IO);
ev_io_start(event_base, &IO->send_event);
- }
- else if ((IO->NextState == eTerminateConnection) ||
- (IO->NextState == eAbort) )
+ break;
+ case eReadPayload:
+ case eReadMore:
+ ev_io_start(event_base, &IO->recv_event);
+ break;
+ case eTerminateConnection:
+ case eAbort:
ShutDownCLient(IO);
+ break;
+ case eSendDNSQuery:
+ case eReadDNSReply:
+ case eDBQuery:
+ case eConnect:
+ case eReadMessage:
+ break;
+ }
return Finished;
}
AsyncIO *IO = watcher->data;
become_session(IO->CitContext);
+#ifdef BIGBAD_IODBG
+ {
+ int rv = 0;
+ char fn [SIZ];
+ FILE *fd;
+ const char *pch = ChrPtr(IO->SendBuf.Buf);
+ const char *pchh = IO->SendBuf.ReadWritePointer;
+ long nbytes;
+
+ if (pchh == NULL)
+ pchh = pch;
+
+ nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
+ snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
+ ((CitContext*)(IO->CitContext))->ServiceName,
+ IO->SendBuf.fd);
+
+ fd = fopen(fn, "a+");
+ fprintf(fd, "Read: BufSize: %ld BufContent: [",
+ nbytes);
+ rv = fwrite(pchh, nbytes, 1, fd);
+ if (!rv) printf("failed to write debug to %s!\n", fn);
+ fprintf(fd, "]\n");
+
+
+ fclose(fd);
+ }
+#endif
rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
if (rc == 0)
pchh = pch;
nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
- snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->SendBuf.fd);
+ snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
+ ((CitContext*)(IO->CitContext))->ServiceName,
+ IO->SendBuf.fd);
fd = fopen(fn, "a+");
fprintf(fd, "Read: BufSize: %ld BufContent: [",
nbytes);
rv = fwrite(pchh, nbytes, 1, fd);
- if (!rv) printf("failed to write debug!");
+ if (!rv) printf("failed to write debug to %s!\n", fn);
fprintf(fd, "]\n");
#endif
ev_io_stop(event_base, &IO->send_event);
switch (IO->NextState) {
- case eSendReply:
- break;
case eSendMore:
assert(IO->SendDone);
IO->NextState = IO->SendDone(IO);
ev_io_start(event_base, &IO->send_event);
}
break;
+ case eSendReply:
+ if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
+ break;
+ IO->NextState = eReadMore;
+ case eReadMore:
case eReadMessage:
+ case eReadPayload:
if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
HandleInbound(IO);
}
ev_io_start(event_base, &IO->recv_event);
}
+ break;
+ case eDBQuery:
+ /* we now live in another queue, so we have to unregister. */
+ ev_cleanup_stop(loop, &IO->abort_by_shutdown);
break;
case eSendDNSQuery:
case eReadDNSReply:
{
switch(IO->NextState) {
+ case eReadMore:
case eReadMessage:
ev_io_start(event_base, &IO->recv_event);
break;
case eSendReply:
case eSendMore:
+ case eReadPayload:
become_session(IO->CitContext);
IO_send_callback(loop, &IO->send_event, revents);
break;
+ case eDBQuery:
case eSendDNSQuery:
case eReadDNSReply:
case eConnect:
AsyncIO *IO = watcher->data;
nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+#ifdef BIGBAD_IODBG
+ {
+ int rv = 0;
+ char fn [SIZ];
+ FILE *fd;
+ const char *pch = ChrPtr(IO->RecvBuf.Buf);
+ const char *pchh = IO->RecvBuf.ReadWritePointer;
+ long nbytes;
+
+ if (pchh == NULL)
+ pchh = pch;
+
+ nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
+ snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
+ ((CitContext*)(IO->CitContext))->ServiceName,
+ IO->SendBuf.fd);
+
+ fd = fopen(fn, "a+");
+ fprintf(fd, "Read: BufSize: %ld BufContent: [",
+ nbytes);
+ rv = fwrite(pchh, nbytes, 1, fd);
+ if (!rv) printf("failed to write debug to %s!\n", fn);
+ fprintf(fd, "]\n");
+
+
+ fclose(fd);
+ }
+#endif
if (nbytes > 0) {
HandleInbound(IO);
} else if (nbytes == 0) {
}
return event_connect_socket(IO, conn_timeout, first_rw_timeout);
}
+
+eNextState ReAttachIO(AsyncIO *IO,
+ void *pData,
+ int ReadFirst)
+{
+ IO->Data = pData;
+ become_session(IO->CitContext);
+ ev_cleanup_start(event_base, &IO->abort_by_shutdown);
+ if (ReadFirst) {
+ IO->NextState = eReadMessage;
+ }
+ else {
+ IO->NextState = eSendReply;
+ }
+ set_start_callback(event_base, IO, 0);
+
+ return IO->NextState;
+}
typedef enum _eNextState {
eSendDNSQuery,
eReadDNSReply,
+ eDBQuery,
eConnect,
eSendReply,
eSendMore,
eReadMessage,
eReadMore,
+ eReadPayload,
eTerminateConnection,
eAbort
}eNextState;
typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int);
typedef void (*FreeDNSReply)(void *DNSData);
+
+typedef struct __ReadAsyncMsg {
+ StrBuf *MsgBuf;
+ size_t maxlen; /* maximum message length */
+
+ const char *terminator; /* token signalling EOT */
+ long tlen;
+ int dodot;
+
+ int flushing; /* if we read maxlen, read until nothing more arives and ignore this. */
+
+ int crlf; /* CRLF newlines instead of LF */
+} ReadAsyncMsg;
+
+
typedef struct _DNSQueryParts {
ParseDNSAnswerCb DNS_CB;
IO_CallBack PostDNS;
RecvBuf;
/* our events... */
- ev_cleanup abort_by_shutdown; /* server wants to go down... */
+ ev_cleanup abort_by_shutdown, /* server wants to go down... */
+ db_abort_by_shutdown; /* server wants to go down... */
ev_timer conn_fail, /* connection establishing timed out */
rw_timeout; /* timeout while sending data */
ev_idle unwind_stack, /* get c-ares out of the stack */
+ db_unwind_stack, /* wait for next db operation... */
conn_fail_immediate; /* unwind stack, but fail immediately. */
ev_io recv_event, /* receive data from the client */
send_event, /* send more data to the client */
Terminate, /* shutting down... */
Timeout, /* Timeout handler; may also be connection timeout */
ConnFail, /* What to do when one connection failed? */
- ShutdownAbort;/* we're going down. make your piece. */
+ ShutdownAbort,/* we're going down. make your piece. */
+ NextDBOperation; /* Perform Database IO */
IO_LineReaderCallback LineReader; /* if we have linereaders, maybe we want to read more lines before the real application logic is called? */
evcurl_request_data HttpReq;
/* Saving / loading a message async from / to disk */
-
+ ReadAsyncMsg *ReadMsg;
struct CtdlMessage *AsyncMsg;
struct recptypes *AsyncRcp;
/* Custom data; its expected to contain AsyncIO so we can save malloc()s... */
void FreeAsyncIOContents(AsyncIO *IO);
-void NextDBOperation(AsyncIO *IO, IO_CallBack CB);
+eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB);
int QueueDBOperation(AsyncIO *IO, IO_CallBack CB);
int QueueEventContext(AsyncIO *IO, IO_CallBack CB);
int ShutDownEventQueue(void);
IO_CallBack CallBack,
IO_CallBack Terminate);
+eNextState ReAttachIO(AsyncIO *IO,
+ void *pData,
+ int ReadFirst);
+
void evcurl_handle_start(AsyncIO *IO);
#endif /* __EVENT_CLIENT_H__ */